国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - 從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

2021-12-06 13:14xuxh120 Java教程

這篇文章主要介紹了從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列,對(duì)正在學(xué)習(xí)這方面知識(shí)的小伙伴有很大的幫助,感興趣的小伙伴快來一起學(xué)習(xí)吧

一、背景

Disruptor是英國(guó)外匯交易公司LMAX開發(fā)的一個(gè)高性能隊(duì)列,研發(fā)的初衷是解決內(nèi)存隊(duì)列的延遲問題(在性能測(cè)試中發(fā)現(xiàn)竟然與I/O操作處于同樣的數(shù)量級(jí))。基于Disruptor開發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單,2010年在QCon演講后,獲得了業(yè)界關(guān)注。2011年,企業(yè)應(yīng)用軟件專家Martin Fowler專門撰寫長(zhǎng)文介紹。同年它還獲得了Oracle官方的Duke大獎(jiǎng)。

目前,包括Apache Storm、Camel、Log4j 2在內(nèi)的很多知名項(xiàng)目都應(yīng)用了Disruptor以獲取高性能。在美團(tuán)技術(shù)團(tuán)隊(duì)它也有不少應(yīng)用,有的項(xiàng)目架構(gòu)借鑒了它的設(shè)計(jì)機(jī)制。本文從實(shí)戰(zhàn)角度剖析了Disruptor的實(shí)現(xiàn)原理。

需要特別指出的是,這里所說的隊(duì)列是系統(tǒng)內(nèi)部的內(nèi)存隊(duì)列,而不是Kafka這樣的分布式隊(duì)列。另外,本文所描述的Disruptor特性限于3.3.4。

 

二、Java內(nèi)置隊(duì)列

介紹Disruptor之前,我們先來看一看常用的線程安全的內(nèi)置隊(duì)列有什么問題。Java的內(nèi)置隊(duì)列如下表所示。

隊(duì)列 有界性 數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue bounded 加鎖 arraylist
LinkedBlockingQueue optionally-bounded 加鎖 linkedlist
ConcurrentLinkedQueue unbounded 無鎖 linkedlist
LinkedTransferQueue unbounded 無鎖 linkedlist
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap

 

隊(duì)列的底層一般分成三種:數(shù)組、鏈表和堆。其中,堆一般情況下是為了實(shí)現(xiàn)帶有優(yōu)先級(jí)特性的隊(duì)列,暫且不考慮。

我們就從數(shù)組和鏈表兩種數(shù)據(jù)結(jié)構(gòu)來看,基于數(shù)組線程安全的隊(duì)列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證線程安全;基于鏈表的線程安全隊(duì)列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實(shí)現(xiàn)線程安全,而后者以及上面表格中的LinkedTransferQueue都是通過原子變量compare and swap(以下簡(jiǎn)稱“CAS”)這種不加鎖的方式來實(shí)現(xiàn)的。

通過不加鎖的方式實(shí)現(xiàn)的隊(duì)列都是無界的(無法保證隊(duì)列的長(zhǎng)度在確定的范圍內(nèi));而加鎖的方式,可以實(shí)現(xiàn)有界隊(duì)列。在穩(wěn)定性要求特別高的系統(tǒng)中,為了防止生產(chǎn)者速度過快,導(dǎo)致內(nèi)存溢出,只能選擇有界隊(duì)列;同時(shí),為了減少Java的垃圾回收對(duì)系統(tǒng)性能的影響,會(huì)盡量選擇array/heap格式的數(shù)據(jù)結(jié)構(gòu)。這樣篩選下來,符合條件的隊(duì)列就只有ArrayBlockingQueue。

 

三、ArrayBlockingQueue的問題

ArrayBlockingQueue在實(shí)際使用過程中,會(huì)因?yàn)榧渔i和偽共享等出現(xiàn)嚴(yán)重的性能問題,我們下面來分析一下。

 

1.加鎖

現(xiàn)實(shí)編程過程中,加鎖通常會(huì)嚴(yán)重地影響性能。線程會(huì)因?yàn)楦?jìng)爭(zhēng)不到鎖而被掛起,等鎖被釋放的時(shí)候,線程又會(huì)被恢復(fù),這個(gè)過程中存在著很大的開銷,并且通常會(huì)有較長(zhǎng)時(shí)間的中斷,因?yàn)楫?dāng)一個(gè)線程正在等待鎖時(shí),它不能做任何其他事情。如果一個(gè)線程在持有鎖的情況下被延遲執(zhí)行,例如發(fā)生了缺頁(yè)錯(cuò)誤、調(diào)度延遲或者其它類似情況,那么所有需要這個(gè)鎖的線程都無法執(zhí)行下去。如果被阻塞線程的優(yōu)先級(jí)較高,而持有鎖的線程優(yōu)先級(jí)較低,就會(huì)發(fā)生優(yōu)先級(jí)反轉(zhuǎn)。

Disruptor論文中講述了一個(gè)實(shí)驗(yàn):

  • 這個(gè)測(cè)試程序調(diào)用了一個(gè)函數(shù),該函數(shù)會(huì)對(duì)一個(gè)64位的計(jì)數(shù)器循環(huán)自增5億次。
  • 機(jī)器環(huán)境:2.4G 6核
  • 運(yùn)算: 64位的計(jì)數(shù)器累加5億次

|Method | Time (ms) | |— | —|

|Single thread | 300|

|Single thread with CAS | 5,700|

|Single thread with lock | 10,000|

|Single thread with volatile write | 4,700|

|Two threads with CAS | 30,000|

|Two threads with lock | 224,000|

CAS操作比單線程無鎖慢了1個(gè)數(shù)量級(jí);有鎖且多線程并發(fā)的情況下,速度比單線程無鎖慢3個(gè)數(shù)量級(jí)。可見無鎖速度最快。

單線程情況下,不加鎖的性能 > CAS操作的性能 > 加鎖的性能。

在多線程情況下,為了保證線程安全,必須使用CAS或鎖,這種情況下,CAS的性能超過鎖的性能,前者大約是后者的8倍。

綜上可知,加鎖的性能是最差的。

 

a.關(guān)于鎖和CAS

保證線程安全一般分成兩種方式:鎖和原子變量。

 

b.鎖

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖1 通過加鎖的方式實(shí)現(xiàn)線程安全

采取加鎖的方式,默認(rèn)線程會(huì)沖突,訪問數(shù)據(jù)時(shí),先加上鎖再訪問,訪問之后再解鎖。通過鎖界定一個(gè)臨界區(qū),同時(shí)只有一個(gè)線程進(jìn)入。如上圖所示,Thread2訪問Entry的時(shí)候,加了鎖,Thread1就不能再執(zhí)行訪問Entry的代碼,從而保證線程安全。

下面是ArrayBlockingQueue通過加鎖的方式實(shí)現(xiàn)的offer方法,保證線程安全。

  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. if (count == items.length)
  7. return false;
  8. else {
  9. insert(e);
  10. return true;
  11. }
  12. } finally {
  13. lock.unlock();
  14. }
  15. }

 

c.原子變量

原子變量能夠保證原子性的操作,意思是某個(gè)任務(wù)在執(zhí)行過程中,要么全部成功,要么全部失敗回滾,恢復(fù)到執(zhí)行之前的初態(tài),不存在初態(tài)和成功之間的中間狀態(tài)。例如CAS操作,要么比較并交換成功,要么比較并交換失敗。由CPU保證原子性。

通過原子變量可以實(shí)現(xiàn)線程安全。執(zhí)行某個(gè)任務(wù)的時(shí)候,先假定不會(huì)有沖突,若不發(fā)生沖突,則直接執(zhí)行成功;當(dāng)發(fā)生沖突的時(shí)候,則執(zhí)行失敗,回滾再重新操作,直到不發(fā)生沖突。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖2 通過原子變量CAS實(shí)現(xiàn)線程安全

如圖所示,Thread1和Thread2都要把Entry加1。若不加鎖,也不使用CAS,有可能Thread1取到了myValue=1,Thread2也取到了myValue=1,然后相加,Entry中的value值為2。這與預(yù)期不相符,我們預(yù)期的是Entry的值經(jīng)過兩次相加后等于3。

CAS會(huì)先把Entry現(xiàn)在的value跟線程當(dāng)初讀出的值相比較,若相同,則賦值;若不相同,則賦值執(zhí)行失敗。一般會(huì)通過while/for循環(huán)來重新執(zhí)行,直到賦值成功。

代碼示例是AtomicInteger的getAndAdd方法。CAS是CPU的一個(gè)指令,由CPU保證原子性。


	
  1. /**
  2. * Atomically adds the given value to the current value.
  3. *
  4. * @param delta the value to add
  5. * @return the previous value
  6. */
  7. public final int getAndAdd(int delta) {
  8. for (;;) {
  9. int current = get();
  10. int next = current + delta;
  11. if (compareAndSet(current, next))
  12. return current;
  13. }
  14. }
  15.  
  16. /**
  17. * Atomically sets the value to the given updated value
  18. * if the current value {@code ==} the expected value.
  19. *
  20. * @param expect the expected value
  21. * @param update the new value
  22. * @return true if successful. False return indicates that
  23. * the actual value was not equal to the expected value.
  24. */
  25. public final boolean compareAndSet(int expect, int update) {
  26. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  27. }

在高度競(jìng)爭(zhēng)的情況下,鎖的性能將超過原子變量的性能,但是更真實(shí)的競(jìng)爭(zhēng)情況下,原子變量的性能將超過鎖的性能。同時(shí)原子變量不會(huì)有死鎖等活躍性問題。

 

2.偽共享

 

a.什么是共享

下圖是計(jì)算的基本結(jié)構(gòu)。L1、L2、L3分別表示一級(jí)緩存、二級(jí)緩存、三級(jí)緩存,越靠近CPU的緩存,速度越快,容量也越小。所以L1緩存很小但很快,并且緊靠著在使用它的CPU內(nèi)核;L2大一些,也慢一些,并且仍然只能被一個(gè)單獨(dú)的CPU核使用;L3更大、更慢,并且被單個(gè)插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖3 計(jì)算機(jī)CPU與緩存示意圖

當(dāng)CPU執(zhí)行運(yùn)算的時(shí)候,它先去L1查找所需的數(shù)據(jù)、再去L2、然后是L3,如果最后這些緩存中都沒有,所需的數(shù)據(jù)就要去主內(nèi)存拿。走得越遠(yuǎn),運(yùn)算耗費(fèi)的時(shí)間就越長(zhǎng)。所以如果你在做一些很頻繁的事,你要盡量確保數(shù)據(jù)在L1緩存中。

另外,線程之間共享一份數(shù)據(jù)的時(shí)候,需要一個(gè)線程把數(shù)據(jù)寫回主存,而另一個(gè)線程訪問主存中相應(yīng)的數(shù)據(jù)。

下面是從CPU訪問不同層級(jí)數(shù)據(jù)的時(shí)間概念:

從CPU到 大約需要的CPU周期 大約需要的時(shí)間
主存 - 約60-80ns
QPI 總線傳輸(between sockets, not drawn) - 約20ns
L3 cache 約40-45 cycles 約15ns
L2 cache 約10 cycles 約3ns
L1 cache 約3-4 cycles 約1ns
寄存器 1 cycle -

可見CPU讀取主存中的數(shù)據(jù)會(huì)比從L1中讀取慢了近2個(gè)數(shù)量級(jí)。

 

b.緩存行

Cache是由很多個(gè)cache line組成的。每個(gè)cache line通常是64字節(jié),并且它有效地引用主內(nèi)存中的一塊兒地址。一個(gè)Java的long類型變量是8字節(jié),因此在一個(gè)緩存行中可以存8個(gè)long類型的變量。

CPU每次從主存中拉取數(shù)據(jù)時(shí),會(huì)把相鄰的數(shù)據(jù)也存入同一個(gè)cache line。

在訪問一個(gè)long數(shù)組的時(shí)候,如果數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)自動(dòng)加載另外7個(gè)。因此你能非常快的遍歷這個(gè)數(shù)組。事實(shí)上,你可以非常快速的遍歷在連續(xù)內(nèi)存塊中分配的任意數(shù)據(jù)結(jié)構(gòu)。

下面的例子是測(cè)試?yán)胏ache line的特性和不利用cache line的特性的效果對(duì)比。

  1. package com.meituan.FalseSharing;
  2.  
  3. /**
  4. * @author gongming
  5. * @description
  6. * @date 16/6/4
  7. */
  8. public class CacheLineEffect {
  9. //考慮一般緩存行大小是64字節(jié),一個(gè) long 類型占8字節(jié)
  10. static long[][] arr;
  11.  
  12. public static void main(String[] args) {
  13. arr = new long[1024 * 1024][];
  14. for (int i = 0; i < 1024 * 1024; i++) {
  15. arr[i] = new long[8];
  16. for (int j = 0; j < 8; j++) {
  17. arr[i][j] = 0L;
  18. }
  19. }
  20. long sum = 0L;
  21. long marked = System.currentTimeMillis();
  22. for (int i = 0; i < 1024 * 1024; i+=1) {
  23. for(int j =0; j< 8;j++){
  24. sum = arr[i][j];
  25. }
  26. }
  27. System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
  28.  
  29. marked = System.currentTimeMillis();
  30. for (int i = 0; i < 8; i+=1) {
  31. for(int j =0; j< 1024 * 1024;j++){
  32. sum = arr[j][i];
  33. }
  34. }
  35. System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
  36. }
  37. }

在2G Hz、2核、8G內(nèi)存的運(yùn)行環(huán)境中測(cè)試,速度差一倍。

結(jié)果:

Loop times:30ms Loop times:65ms

 

c.什么是偽共享

ArrayBlockingQueue有三個(gè)成員變量: - takeIndex:需要被取走的元素下標(biāo) - putIndex:可被元素插入的位置的下標(biāo) - count:隊(duì)列中元素的數(shù)量

這三個(gè)變量很容易放到一個(gè)緩存行中,但是之間修改沒有太多的關(guān)聯(lián)。所以每次修改,都會(huì)使之前緩存的數(shù)據(jù)失效,從而不能完全達(dá)到共享的效果。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖4 ArrayBlockingQueue偽共享示意圖

如上圖所示,當(dāng)生產(chǎn)者線程put一個(gè)元素到ArrayBlockingQueue時(shí),putIndex會(huì)修改,從而導(dǎo)致消費(fèi)者線程的緩存中的緩存行無效,需要從主存中重新讀取。

這種無法充分使用緩存行特性的現(xiàn)象,稱為偽共享。

對(duì)于偽共享,一般的解決方案是,增大數(shù)組元素的間隔使得由不同線程存取的元素位于不同的緩存行上,以空間換時(shí)間。

  1. package com.meituan.FalseSharing;
  2.  
  3. public class FalseSharing implements Runnable{
  4. public final static long ITERATIONS = 500L * 1000L * 100L;
  5. private int arrayIndex = 0;
  6.  
  7. private static ValuePadding[] longs;
  8. public FalseSharing(final int arrayIndex) {
  9. this.arrayIndex = arrayIndex;
  10. }
  11.  
  12. public static void main(final String[] args) throws Exception {
  13. for(int i=1;i<10;i++){
  14. System.gc();
  15. final long start = System.currentTimeMillis();
  16. runTest(i);
  17. System.out.println("Thread num "+i+" duration = " + (System.currentTimeMillis() - start));
  18. }
  19.  
  20. }
  21.  
  22. private static void runTest(int NUM_THREADS) throws InterruptedException {
  23. Thread[] threads = new Thread[NUM_THREADS];
  24. longs = new ValuePadding[NUM_THREADS];
  25. for (int i = 0; i < longs.length; i++) {
  26. longs[i] = new ValuePadding();
  27. }
  28. for (int i = 0; i < threads.length; i++) {
  29. threads[i] = new Thread(new FalseSharing(i));
  30. }
  31.  
  32. for (Thread t : threads) {
  33. t.start();
  34. }
  35.  
  36. for (Thread t : threads) {
  37. t.join();
  38. }
  39. }
  40.  
  41. public void run() {
  42. long i = ITERATIONS + 1;
  43. while (0 != --i) {
  44. longs[arrayIndex].value = 0L;
  45. }
  46. }
  47.  
  48. public final static 編程客棧class ValuePadding {
  49. protected long p1, p2, p3, p4, p5, p6, p7;
  50. protected volatile long value = 0L;
  51. protected long p9, p10, p11, p12, p13, p14;
  52. protected long p15;
  53. }
  54. public final static class ValueNoPadding {
  55. // protected long p1, p2, p3, p4, p5, p6, p7;
  56. protected volatile long value = 0L;
  57. // protected long p9, p10, p11, p12, p13, p14, p15;
  58. }
  59. }

在2G Hz,2核,8G內(nèi)存, jdk 1.7.0_45 的運(yùn)行環(huán)境下,使用了共享機(jī)制比沒有使用共享機(jī)制,速度快了4倍左右。

結(jié)果:

  • Thread num 1 duration = 447
  • Thread num 2 duration = 463
  • Thread num 3 duration = 454
  • Thread num 4 duration = 464
  • Thread num 5 duration = 561
  • Thread num 6 duration = 606
  • Thread num 7 duration = 684
  • Thread num 8 duration = 870
  • Thread num 9 duration = 823

把代碼中ValuePadding都替換為ValueNoPadding后的結(jié)果:

  • Thread num 1 duration = 446
  • Thread num 2 duration = 2549
  • Thread num 3 duration = 2898
  • Thread num 4 duration = 3931
  • Thread num 5 duration = 4716
  • Thread num 6 duration = 5424
  • Thread num 7 duration = 4868
  • Thread num 8 duration = 4595
  • Thread num 9 duration = 4540

備注:在jdk1.8中,有專門的注解@Contended來避免偽共享,更優(yōu)雅地解決問題。

 

四、Disruptor的設(shè)計(jì)方案

Disruptor通過以下設(shè)計(jì)來解決隊(duì)列速度慢的問題:

  • 環(huán)形數(shù)組結(jié)構(gòu)

為了避免垃圾回收,采用數(shù)組而非鏈表。同時(shí),數(shù)組對(duì)處理器的緩存機(jī)制更加友好。

  • 元素位置定位

數(shù)組長(zhǎng)度2^n,通過位運(yùn)算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問題。index是long類型,即使100萬(wàn)QPS的處理速度,也需要30萬(wàn)年才能用完。

  • 無鎖設(shè)計(jì)

每個(gè)生產(chǎn)者或者消費(fèi)者線程,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置,申請(qǐng)到之后,直接在該位置寫入或者讀取數(shù)據(jù)。

下面忽略數(shù)組的環(huán)形結(jié)構(gòu),介紹一下如何實(shí)現(xiàn)無鎖設(shè)計(jì)。整個(gè)過程通過原子變量CAS,保證操作的線程安全。

 

1.一個(gè)生產(chǎn)者

寫數(shù)據(jù)

生產(chǎn)者單線程寫數(shù)據(jù)的流程比較簡(jiǎn)單:

  • 1.申請(qǐng)寫入m個(gè)元素;
  • 2.若是有m個(gè)元素可以入,則返回最大的序列號(hào)。這兒主要判斷是否會(huì)覆蓋未讀的元素;
  • 3.若是返回的正確,則生產(chǎn)者開始寫入元素。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖5 單個(gè)生產(chǎn)者生產(chǎn)過程示意圖

 

2.多個(gè)生產(chǎn)者

多個(gè)生產(chǎn)者的情況下,會(huì)遇到“如何防止多個(gè)線程重復(fù)寫同一個(gè)元素”的問題。Disruptor的解決方法是,每個(gè)線程獲取不同的一段數(shù)組空間進(jìn)行操作。這個(gè)通過CAS很容易達(dá)到。只需要在分配元素的時(shí)候,通過CAS判斷一下這段空間是否已經(jīng)分配出去即可。

但是會(huì)遇到一個(gè)新問題:如何防止讀取的時(shí)候,讀到還未寫的元素。Disruptor在多個(gè)生產(chǎn)者的情況下,引入了一個(gè)與Ring Buffer大小相同的buffer:a編程客棧vailable Buffer。當(dāng)某個(gè)位置寫入成功的時(shí)候,便把a(bǔ)vailble Buffer相應(yīng)的位置置位,標(biāo)記為寫入成功。讀取的時(shí)候,會(huì)遍歷available Buffer,來判斷元素是否已經(jīng)就緒。

下面分讀數(shù)據(jù)和寫數(shù)據(jù)兩種情況介紹。

 

a.讀數(shù)據(jù)

生產(chǎn)者多線程寫入的情況會(huì)復(fù)雜很多:

  • 申請(qǐng)讀取到序號(hào)n;
  • 若writer cursor >= n,這時(shí)仍然無法確定連續(xù)可讀的最大下標(biāo)。從reader cursor開始讀取available Buffer,一直查到第一個(gè)不可用的元素,然后返回最大連續(xù)可讀元素的位置;
  • 消費(fèi)者讀取元素。

如下圖所示,讀線程讀到下標(biāo)為2的元素,三個(gè)線程Writer1/Writer2/Writer3正在向RingBuffer相應(yīng)位置寫數(shù)據(jù),寫線程被分配到的最大元素下標(biāo)是11。

讀線程申請(qǐng)讀取到下標(biāo)從3到11的元素,判斷writer cursor>=11。然后開始讀取availableBuffer,從3開始,往后讀取,發(fā)現(xiàn)下標(biāo)為7的元素沒有生產(chǎn)成功,于是WaitFor(11)返回6。

然后,消費(fèi)者讀取下標(biāo)從3到6共計(jì)4個(gè)元素。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖6 多個(gè)生產(chǎn)者情況下,消費(fèi)者消費(fèi)過程示意圖

 

b.寫數(shù)據(jù)

多個(gè)生產(chǎn)者寫入的時(shí)候:

  • 申請(qǐng)寫入m個(gè)元素;
  • 若是有m個(gè)元素可以寫入,則返回最大的序列號(hào)。每個(gè)生產(chǎn)者會(huì)被分配一段獨(dú)享的空間;
  • 生產(chǎn)者寫入元素,寫入元素的同時(shí)設(shè)置available Buffer里面相應(yīng)的位置,以標(biāo)記自己哪些位置是已經(jīng)寫入成功的。

如下圖所示,Writer1和Writer2兩個(gè)線程寫入數(shù)組,都申請(qǐng)可寫的數(shù)組空間。Writer1被分配了下標(biāo)3到下表5的空間,Writer2被分配了下標(biāo)6到下標(biāo)9的空間。

Writer1寫入下標(biāo)3位置的元素,同時(shí)把a(bǔ)vailable Buffer相應(yīng)位置置位,標(biāo)記已經(jīng)寫入成功,往后移一位,開始寫下標(biāo)4位置的元素。Writer2同樣的方式。最終都寫入完成。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖7 多個(gè)生產(chǎn)者情況下編程客棧,生產(chǎn)者生產(chǎn)過程示意圖

防止不同生產(chǎn)者對(duì)同一段空間寫入的代碼,如下所示:

  1. public long tryNext(int n) throws InsufficientCapacityException
  2. {
  3. if (n < 1)
  4. {
  5. throw new IllegalArgumentException("n must be > 0");
  6. }
  7.  
  8. long current;
  9. long next;
  10.  
  11. do
  12. {
  13. current = cursor.get();
  14. next = current + n;
  15.  
  16. if (!hasAvailableCapacity(gatingSequences, n, current))
  17. {
  18. throw InsufficientCapacityException.INSTANCE;
  19. }
  20. }
  21. while (!cursor.compareAndSet(current, next));
  22.  
  23. return next;
  24. }

通過do/while循環(huán)的條件cursor.compareAndSet(current, next),來判斷每次申請(qǐng)的空間是否已經(jīng)被其他生產(chǎn)者占據(jù)。假如已經(jīng)被占據(jù),該函數(shù)會(huì)返回失敗,While循環(huán)重新執(zhí)行,申請(qǐng)寫入空間。

消費(fèi)者的流程與生產(chǎn)者非常類似,這兒就不多描述了。

 

五、總結(jié)

Disruptor通過精巧的無鎖設(shè)計(jì)實(shí)現(xiàn)了在高并發(fā)情形下的高性能。

在美團(tuán)內(nèi)部,很多高并發(fā)場(chǎng)景借鑒了Disruptor的設(shè)計(jì),減少競(jìng)爭(zhēng)的強(qiáng)度。其設(shè)計(jì)思想可以擴(kuò)展到分布式場(chǎng)景,通過無鎖設(shè)計(jì),來提升服務(wù)性能。

使用Disruptor比使用ArrayBlockingQueue略微復(fù)雜,為方便讀者上手,增加代碼樣例。

代碼實(shí)現(xiàn)的功能:每10ms向disruptor中插入一個(gè)元素,消費(fèi)者讀取數(shù)據(jù),并打印到終端。詳細(xì)邏輯請(qǐng)細(xì)讀代碼。

以下代碼基于3.3.4版本的Disruptor包。

  1. package com.meituan.Disruptor;
  2.  
  3. /**
  4. * @description disruptor代碼樣例。每10ms向disruptor中插入一個(gè)元素,消費(fèi)者讀取數(shù)據(jù),并打印到終端
  5. */
  6. import com.lmax.disruptor.*;
  7. import com.lmax.disruptor.dsl.Disruptor;
  8. import com.lmax.disruptor.dsl.ProducerType;
  9.  
  10. import java.util.concurrent.ThreadFactory;
  11.  
  12.  
  13. public class DisruptorMain
  14. {
  15. public static void main(String[] args) throws Exception
  16. {
  17. // 隊(duì)列中的元素
  18. class Element {
  19.  
  20. private int value;
  21.  
  22. public int get(){
  23. return value;
  24. }
  25.  
  26. public void set(int value){
  27. this.value= value;
  28. }
  29.  
  30. }
  31.  
  32. // 生產(chǎn)者的線程工廠
  33. ThreadFactory threadFactory = new ThreadFactory(){
  34. @Override
  35. public Thread newThread(Runnable r) {
  36. return new Thread(r, "simpleThread");
  37. }
  38. };
  39.  
  40. // RingBuffer生產(chǎn)工廠,初始化RingBuffer的時(shí)候使用
  41. EventFactory<Element> factory = new EventFactory<Element>() {
  42. @Override
  43. public Element newInstance() {
  44. return new Element();
  45. }
  46. };
  47.  
  48. // 處理Event的handler
  49. EventHandler<Element> handler = new EventHandler<Element>(){
  50. @Override
  51. public void onEvent(Element element, long sequence, boolean endOfBatch)
  52. {
  53. System.out.println("Element: " + element.get());
  54. }
  55. };
  56.  
  57. // 阻塞策略
  58. BlockingWaitStrategy strategy = new BlockingWaitStrategy();
  59.  
  60. // 指定RingBuffer的大小
  61. int bufferSize = 16;
  62.  
  63. // 創(chuàng)建disruptor,采用單生產(chǎn)者模式
  64. Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
  65.  
  66. // 設(shè)置EventHandler
  67. disruptor.handleEventsWith(handler);
  68.  
  69. // 啟動(dòng)disruptor的線程
  70. disruptor.start();
  71.  
  72. RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
  73.  
  74. for (int l = 0; true; l++)
  75. {
  76. // 獲取下一個(gè)可用位置的下標(biāo)
  77. long sequence = ringBuffer.next();
  78. try
  79. {
  80. // 返回可用位置的元素
  81. Element event = ringBuffer.get(sequence);
  82. // 設(shè)置該位置元素的值
  83. event.set(l);
  84. }
  85. finally
  86. {
  87. ringBuffer.publish(sequence);
  88. }
  89. Thread.sleep(10);
  90. }
  91. }
  92. }

 

六、性能

以下面這些模式測(cè)試性能:

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

吞吐量測(cè)試數(shù)據(jù)(每秒的數(shù)量)如下。

環(huán)境: - CPU:Intel Core i7 860 @ 2.8 GHz without HT - JVM:Java 1.6.0_25 64-bit - OS:Windows 7

- ABQ Disruptor
Unicast: 1P – 1C 5,339,256 25,998,336
Pipeline: 1P – 3C 2,128,918 16,806,157
Sequencer: 3P – 1C 5,539,531 13,403,268
Multicast: 1P – 3C 1,077,384 9,377,871
Diamond: 1P – 3C 2,113,941 16,143,613

環(huán)境:

  • CPU:Intel Core i7-2720QM
  • JVM:Java 1.6.0_25 64-bit
  • OS:Ubuntu 11.04
- ABQ Disruptor
Unicast: 1P – 1C 4,057,453 22,381,378
Pipeline: 1P – 3C 2,006,903 15,857,913
Sequencer: 3P – 1C 2,056,118 14,540,519
Multicast: 1P – 3C 260,733 10,860,121
Diamond: 1P – 3C 2,082,725 15,295,197

依據(jù)并發(fā)競(jìng)爭(zhēng)的激烈程度的不同,Disruptor比ArrayBlockingQueue吞吐量快4~7倍。

按照Pipeline: 1P – 3C的連接模式測(cè)試延遲,生產(chǎn)者兩次寫入之間的延遲為1ms。

運(yùn)行環(huán)境:

CPU:2.2GHz Core i7-2720QM

Java: 1.6.0_25 64-bit

OS:Ubuntu 11.04.

- Array Blocking Queue (ns) Disruptor (ns)
99% observations less than 2,097,152 128
99.99% observations less than 4,194,304 8,192
Max Latency 5,069,086 175,567
Mean Latency 32,757 52
Min Latency 145 29

可見,平均延遲差了3個(gè)數(shù)量級(jí)。

 

七、等待策略

 

生產(chǎn)者的等待策略

暫時(shí)只有休眠1ns。

LockSupport.parkNanos(1);

 

消費(fèi)者的等待策略

名稱 措施 適用場(chǎng)景
BlockingWaitStrategy 加鎖 CPU資源緊缺,吞吐量和延遲并不重要的場(chǎng)景
BusySpinWaitStrategy 自旋 通過不斷重試,減少切換線程導(dǎo)致的系統(tǒng)調(diào)用,而降低延遲。推薦在線程綁定到固定的CPU的場(chǎng)景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定義策略 CPU資源緊缺,吞吐量和延遲并不重要的場(chǎng)景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU資源之間有很好的折中。延遲不均勻
TimeoutBlockingWaitStrategy 加鎖,有超時(shí)限制 CPU資源緊缺,吞吐量和延遲并不重要的場(chǎng)景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU資源之間有很好的折中。延遲比較均勻

 

八、Log4j 2應(yīng)用場(chǎng)景

Log4j 2相對(duì)于Log4j 1最大的優(yōu)勢(shì)在于多線程并發(fā)場(chǎng)景下性能更優(yōu)。該特性源自于Log4j 2的異步模式采用了Disruptor來處理。 在Log4j 2的配置文件中可以配置WaitStrategy,默認(rèn)是Timeout策略。下面是Log4j 2中對(duì)WaitStrategy的配置官方文檔:

System

Property

Default Value Description

AsyncLogger.

WaitStrategy

Timeout Valid values: Block, Timeout, Sleep, Yield. Block is a strategy that uses a lock and condition variable for the I/O thread waiting for log events. Block can be used when throughput and low-latency are not as important as CPU resource. Recommended for resource constrained/virtualised environments. Timeout is a variation of the Block strategy that will periodically wake up from the lock condition await() call. This ensures that if a notification is missed somehow the consumer thread is not stuck but will recover with a small latency delay (default 10ms). Sleep is a strategy that initially spins, then uses a Thread.yield(), and eventually parks for the minimum number of nanos the OS and JVM will allow while the I/O thread is waiting for log events. Sleep is a good compromise between performance and CPU resource. This strategy has very low impact on the application thread, in exchange for some additional latency for actually getting the message logged. Yield is a strategy that uses a Thread.yield() for waiting for log events after an initially spinning. Yield is a good compromise between performance and CPU resource, but may use more CPU than Sleep in order to get the message logged to disk sooner.

 

1.性能差異

loggers all async采用的是Disruptor,而Async Appender采用的是ArrayBlockingQueue隊(duì)列。

由圖可見,單線程情況下,loggers all async與Async Appender吞吐量相差不大,但是在64個(gè)線程的時(shí)候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。

從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列

圖8 Log4j 2各個(gè)模式性能比較

美團(tuán)在公司內(nèi)部統(tǒng)一推行日志接入規(guī)范,要求必須使用Log4j 2,使普通單機(jī)QPS的上限不再只停留在幾千,極高地提升了服務(wù)性能。

參考文檔

  • http://brokendreams.iteye.com/blog/2255720
  • http://ifeve.com/dissecting-disruptor-whats-so-special/
  • https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
  • https://lmax-exchange.github.io/disruptor/
  • https://logging.apache.org/log4j/2.x/manual/async.html
  • https://tech.meituan.com/2016/11/18/disruptor.html

到此這篇關(guān)于從實(shí)戰(zhàn)角度詳解Disruptor高性能隊(duì)列的文章就介紹到這了,更多相關(guān)Disruptor隊(duì)列內(nèi)容請(qǐng)搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!

原文鏈接:https://www.cnblogs.com/xuxh120/p/15186964.html

延伸 · 閱讀

精彩推薦
  • Java教程20個(gè)非常實(shí)用的Java程序代碼片段

    20個(gè)非常實(shí)用的Java程序代碼片段

    這篇文章主要為大家分享了20個(gè)非常實(shí)用的Java程序片段,對(duì)java開發(fā)項(xiàng)目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學(xué)習(xí)使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java實(shí)現(xiàn)搶紅包功能

    Java實(shí)現(xiàn)搶紅包功能

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)搶紅包功能,采用多線程模擬多人同時(shí)搶紅包,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程Java8中Stream使用的一個(gè)注意事項(xiàng)

    Java8中Stream使用的一個(gè)注意事項(xiàng)

    最近在工作中發(fā)現(xiàn)了對(duì)于集合操作轉(zhuǎn)換的神器,java8新特性 stream,但在使用中遇到了一個(gè)非常重要的注意點(diǎn),所以這篇文章主要給大家介紹了關(guān)于Java8中S...

    阿杜7482021-02-04
  • Java教程xml與Java對(duì)象的轉(zhuǎn)換詳解

    xml與Java對(duì)象的轉(zhuǎn)換詳解

    這篇文章主要介紹了xml與Java對(duì)象的轉(zhuǎn)換詳解的相關(guān)資料,需要的朋友可以參考下...

    Java教程網(wǎng)2942020-09-17
  • Java教程升級(jí)IDEA后Lombok不能使用的解決方法

    升級(jí)IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級(jí),尋思已經(jīng)有好久沒有升過級(jí)了。升級(jí)完畢重啟之后,突然發(fā)現(xiàn)好多錯(cuò)誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關(guān)于小米推送Java代碼,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧...

    富貴穩(wěn)中求8032021-07-12
  • Java教程Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進(jìn)去或缺失數(shù)據(jù)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望...

    spcoder14552021-10-18
主站蜘蛛池模板: 168黄网| 天天澡天天狠天天天做 | 亚洲精品第一区在线观看 | 毛片在线视频 | 亚洲欧洲视频 | 精品国产一区二区三区四区 | 国产精品久久久久久久久久久久久久久久 | 日韩精品毛片免费看 | 亚洲国产精品久久久 | 香蕉成人啪国产精品视频综合网 | 欧美一级欧美三级在线观看 | 骚视频在线观看 | 韩国久久 | 在线不卡a资源高清 | 欧美日韩成人影院 | 久久久久久国产精品 | 亚洲精品成人天堂一二三 | 男女激情网址 | 91在线精品一区二区 | 自拍偷拍亚洲欧美 | 一区久久 | 色综合久久久久久久久久久 | 久久高清精品 | 亚洲一区二区三 | 久久久一级 | 激情久久久 | 日本高清视频在线播放 | 青青草草 | 人人爽人人爽人人片av | 久久精品在线 | 亚洲精品九九 | 欧美成人黄色 | 亚洲精品乱码久久久久久金桔影视 | 欧美久久综合 | 精品伊人久久 | 亚洲视频精品一区 | 亚洲精品9999 | 成人爽a毛片一区二区免费 成年人毛片视频 | 黄色福利视频 | 午夜小电影 | 日韩成人免费 |