高性能的消息框架 go-disruptor

Java程序員都知道, Disruptor 是一個高性能的線程間通信的框架,即在同一個JVM進程中的多線程間消息傳遞,由LMAX開發。

Disruptor性能是如此之高,LMAX利用它可以處理每秒6百萬訂單,用1微秒的延遲獲得吞吐量為100K+。那么Go語言生態圈中有沒有這樣的庫呢?

go-disruptor 就是對Java Disruptor的移植,它也提供了與Java Disruptor類似的API設計,使用起來也算不上麻煩。

至于性能呢,下面就會介紹,這也是本文的重點。

因為Disruptor的高性能, 好多人對它都有所關注, 有一系列的文章介紹Disruptor,比如下列的文章和資源:

也有一些中文的翻譯和介紹,比如 并發編程網的Disrutpor專題

阿里巴巴封仲淹:如何優雅地使用Disruptor

Disruptor由LMAX開發,LMAX目標是要稱為世界上最快的交易平臺,為了取得低延遲和高吞吐率的目標,它們不得不開發一套高性能的生產者-消費者的消息框架。Java自己的Queue的性能還是有所延遲的,下圖就是Disruptor和JDK ArrayBlockingQueue的性能比較。

X軸顯示的是延遲時間,Y軸是操作次數。可以看到Disruptor的延遲小,吞吐率高。

Disruptor有多種使用模型和配置,官方的一些模型的測試結果的鏈接在 這里

我想做的其實就是go-disruptor和官方的Java Disruptor的性能比較。因為Disruptor有多種配置方式,單生產者和多生產者,單消費者和多消費者,配置的不同性能差別還是蠻大的,所以公平地講,兩者的比較應該使用相同的配置,盡管它們是由不同的編程語言開發的。

我選取的一個測試方案是:3個生產者和一個消費者,如果使用一個生產者Java Disruptor的性能會成倍的提升。

Java Disruptor

Java的測試主類如下:

publicclassMain{
privatestaticfinalintNUM_PUBLISHERS =3;//Runtime.getRuntime().availableProcessors();
privatestaticfinalintBUFFER_SIZE =1024*64;
privatestaticfinallongITERATIONS =1000L *1000L *20L;
privatefinalExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS +1, DaemonThreadFactory.INSTANCE);
privatefinalCyclicBarrier cyclicBarrier =newCyclicBarrier(NUM_PUBLISHERS +1);


privatefinalRingBuffer<ValueEvent> ringBuffer = createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE,newBusySpinWaitStrategy());

privatefinalSequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
privatefinalValueAdditionEventHandler handler =newValueAdditionEventHandler();
privatefinalBatchEventProcessor<ValueEvent> batchEventProcessor =newBatchEventProcessor<>(ringBuffer, sequenceBarrier, handler);
privatefinalValueBatchPublisher[] valuePublishers =newValueBatchPublisher[NUM_PUBLISHERS];

 {
for(inti =0; i < NUM_PUBLISHERS; i++)
 {
 valuePublishers[i] = newValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS,16);
 }

 ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
 }


publiclongrunDisruptorPass()throwsException
 {
finalCountDownLatch latch =newCountDownLatch(1);
 handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS));

 Future<?>[] futures = newFuture[NUM_PUBLISHERS];
for(inti =0; i < NUM_PUBLISHERS; i++)
 {
 futures[i] = executor.submit(valuePublishers[i]);
 }
 executor.submit(batchEventProcessor);


longstart = System.currentTimeMillis();
 cyclicBarrier.await(); //start test

for(inti =0; i < NUM_PUBLISHERS; i++)
 {
 futures[i].get();
 } //all published

 latch.await(); //all handled

longopsPerSecond = (ITERATIONS *1000L) / (System.currentTimeMillis() - start);
 batchEventProcessor.halt();

returnopsPerSecond;
 }

publicstaticvoidmain(String[] args)throwsException
 {
 Main m = newMain();
 System.out.println("opsPerSecond:"+ m.runDisruptorPass());
 }
}

生產者和消費者類如下:

publicfinalclassValueAdditionEventHandlerimplementsEventHandler<ValueEvent>
{
privatelongvalue =0;
privatelongcount;
privateCountDownLatch latch;

publiclonggetValue()
 {
returnvalue;
 }

publicvoidreset(finalCountDownLatch latch,finallongexpectedCount)
 {
 value = 0;
this.latch = latch;
 count = expectedCount;
 }

@Override
publicvoidonEvent(finalValueEvent event,finallongsequence,finalbooleanendOfBatch)throwsException
 {
 value = event.getValue();

if(count == sequence)
 {
 latch.countDown();
 }
 }
}
publicfinalclassValueBatchPublisherimplementsRunnable
{
privatefinalCyclicBarrier cyclicBarrier;
privatefinalRingBuffer<ValueEvent> ringBuffer;
privatefinallongiterations;
privatefinalintbatchSize;

publicValueBatchPublisher(
finalCyclicBarrier cyclicBarrier,
finalRingBuffer<ValueEvent> ringBuffer,
finallongiterations,
finalintbatchSize)
 {
this.cyclicBarrier = cyclicBarrier;
this.ringBuffer = ringBuffer;
this.iterations = iterations;
this.batchSize = batchSize;
 }

@Override
publicvoidrun()
 {
try
 {
 cyclicBarrier.await();

for(longi =0; i < iterations; i += batchSize)
 {
longhi = ringBuffer.next(batchSize);
longlo = hi - (batchSize -1);
for(longl = lo; l <= hi; l++)
 {
 ValueEvent event = ringBuffer.get(l);
 event.setValue(l);
 }
 ringBuffer.publish(lo, hi);
 }
 }
catch(Exception ex)
 {
thrownewRuntimeException(ex);
 }
 }
}
publicfinalclassValueEvent
{
privatelongvalue;

publiclonggetValue()
 {
returnvalue;
 }

publicvoidsetValue(finallongvalue)
 {
this.value = value;
 }

publicstaticfinalEventFactory<ValueEvent> EVENT_FACTORY =newEventFactory<ValueEvent>()
 {
publicValueEventnewInstance()
 {
returnnewValueEvent();
 }
 };
}

生產者使用三個線程去寫數據,一個消費者進行處理。生產者運行在三個線程中,批處理寫入,每次寫16個數據。

實際測試每秒能達到 183486238 的吞吐率, 也就是1.8億的吞吐率。

go-disruptor

下面看看go-disruptor的性能能達到多少。

我們知道,Go語言內置的goroutine之間的消息傳遞是通過channel實現的,go-disruptor官方網站上比較了go-disruptor和channel的性能,明顯go-disruptor要比channel要好:

cenario Per Operation Time
Channels: Buffered, Blocking, GOMAXPROCS=1 58.6 ns
Channels: Buffered, Blocking, GOMAXPROCS=2 86.6 ns
Channels: Buffered, Blocking, GOMAXPROCS=3, Contended Write 194 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=1 26.4 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=2 29.2 ns
Channels: Buffered, Non-blocking, GOMAXPROCS=3, Contended Write 110 ns
Disruptor: Writer, Reserve One 4.3 ns
Disruptor: Writer, Reserve Many 1.0 ns
Disruptor: Writer, Reserve One, Multiple Readers 4.5 ns
Disruptor: Writer, Reserve Many, Multiple Readers 0.9 ns
Disruptor: Writer, Await One 3.0 ns
Disruptor: Writer, Await Many 0.7 ns
Disruptor: SharedWriter, Reserve One 13.6 ns
Disruptor: SharedWriter, Reserve Many 2.5 ns
Disruptor: SharedWriter, Reserve One, Contended Write 56.9 ns
Disruptor: SharedWriter, Reserve Many, Contended Write 3.1 ns

在與Java Disruptor相同的測試條件下go-disruptor的性能呢?

下面是測試代碼:

packagemain

import(
"fmt"
"runtime"
"sync"
"time"

 disruptor "github.com/smartystreets/go-disruptor"
)

const(
 RingBufferSize =1024*64
 RingBufferMask = RingBufferSize -1
 ReserveOne =1
 ReserveMany =16
 ReserveManyDelta = ReserveMany -1
 DisruptorCleanup = time.Millisecond *10
)

varringBuffer = [RingBufferSize]int64{}

funcmain() {
 NumPublishers :=3//runtime.NumCPU()
 totalIterations := int64(1000*1000*20)
 iterations := totalIterations / int64(NumPublishers)
 totalIterations = iterations * int64(NumPublishers)

 fmt.Printf("Total: %d, Iterations: %d, Publisher: %d, Consumer: 1\n", totalIterations, iterations, NumPublishers)

 runtime.GOMAXPROCS(NumPublishers)
varconsumer = &countConsumer{TotalIterations: totalIterations, Count:0}
 consumer.WG.Add(1)

 controller := disruptor.Configure(RingBufferSize).WithConsumerGroup(consumer).BuildShared()
 controller.Start()
defercontroller.Stop()

varwg sync.WaitGroup
 wg.Add(NumPublishers +1)

varsendWG sync.WaitGroup
 sendWG.Add(NumPublishers)

fori :=0; i < NumPublishers; i++ {
gofunc() {
 writer := controller.Writer()
 wg.Done()
 wg.Wait()
 current := disruptor.InitialSequenceValue
forcurrent < totalIterations {
 current = writer.Reserve(ReserveMany)

forj := current - ReserveMany; j <= current; j++ {
 ringBuffer[j&RingBufferMask] = j
 }
 writer.Commit(current-ReserveMany, current)
 }

 sendWG.Done()
 }()
 }

 wg.Done()
 t := time.Now().UnixNano()
 wg.Wait() //waiting for ready as a barrier
 fmt.Println("start to publish")

 sendWG.Wait()
 fmt.Println("Finished to publish")

 consumer.WG.Wait()
 fmt.Println("Finished to consume")//waiting for consumer

 t = (time.Now().UnixNano() - t) /1000000//ms

 fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)
}

typecountConsumerstruct{
 Count int64
 TotalIterations int64
 WG sync.WaitGroup
}

func(cc *countConsumer) Consume(lower, upperint64) {
forlower <= upper {
 message := ringBuffer[lower&RingBufferMask]
ifmessage != lower {
 warning := fmt.Sprintf("\nRace condition--Sequence: %d, Message: %d\n", lower, message)
 fmt.Printf(warning)
panic(warning)
 }
 lower++
 cc.Count++
//fmt.Printf("count: %d, message: %d\n", cc.Count-1, message)
ifcc.Count == cc.TotalIterations {
 cc.WG.Done()
return
 }
 }
}

實際測試go-disruptor的每秒的吞吐率達到 137931020

好了,至少我們在相同的測試case情況下得到了兩組數據,另外我還做了相同case情況的go channel的測試,所以一共三組數據:

  • Java Disruptor : 183486238 ops/s
  • go-disruptor : 137931020 ops/s
  • go channel : 6995452 ops/s

可以看到go-disruptor的性能要略微低于Java Disruptor,但是也已經足夠高了,達到1.4億/秒,所以它還是值的我們關注的。go channel的性能遠遠不如前兩者。

Go Channel

如果通過Go Channel實現,每秒的吞吐率為 6995452。

代碼如下:

funcmain() {
 NumPublishers :=3//runtime.NumCPU()
 totalIterations := int64(1000*1000*20)
 iterations := totalIterations / int64(NumPublishers)
 totalIterations = iterations * int64(NumPublishers)
 channel := make(chanint64,1024*64)

varwg sync.WaitGroup
 wg.Add(NumPublishers +1)

varreaderWG sync.WaitGroup
 readerWG.Add(1)

fori :=0; i < NumPublishers; i++ {
gofunc() {
 wg.Done()
 wg.Wait()
fori :=int64(0); i < iterations; {
select{
casechannel <- i:
 i++
default:
continue
 }
 }
 }()
 }

gofunc() {
fori :=int64(0); i < totalIterations; i++ {
select{
casemsg := <-channel:
ifNumPublishers ==1&& msg != i {
//panic("Out of sequence")
 }
default:
continue
 }
 }

 readerWG.Done()
 }()

 wg.Done()
 t := time.Now().UnixNano()
 wg.Wait()

 readerWG.Wait()
 t = (time.Now().UnixNano() - t) /1000000//ms

 fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)
}

 

來自:http://colobu.com/2016/07/22/using-go-disruptor/

 

 本文由用戶 GleQav 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!