高性能的消息框架 go-disruptor
Java程序員都知道, Disruptor 是一個高性能的線程間通信的框架,即在同一個JVM進程中的多線程間消息傳遞,由LMAX開發。
Disruptor性能是如此之高,LMAX利用它可以處理每秒6百萬訂單,用1微秒的延遲獲得吞吐量為100K+。那么Go語言生態圈中有沒有這樣的庫呢?
go-disruptor 就是對Java Disruptor的移植,它也提供了與Java Disruptor類似的API設計,使用起來也算不上麻煩。
至于性能呢,下面就會介紹,這也是本文的重點。
因為Disruptor的高性能, 好多人對它都有所關注, 有一系列的文章介紹Disruptor,比如下列的文章和資源:
- Disruptor Google Group
- Bad Concurrency (Michael Barker)
- LMAX (Planet)
- LMAX Exchange
- Disruptor presentation @ QCon SF
- Disruptor Technical Paper
- Mechanical Sympathy (Martin Thompson)
- Martin Fowler's Technical Review
- .NET Disruptor Port
- Introduction to the Disruptor
- Disruptor wiki
也有一些中文的翻譯和介紹,比如 并發編程網的Disrutpor專題 。
Disruptor由LMAX開發,LMAX目標是要稱為世界上最快的交易平臺,為了取得低延遲和高吞吐率的目標,它們不得不開發一套高性能的生產者-消費者的消息框架。Java自己的Queue的性能還是有所延遲的,下圖就是Disruptor和JDK ArrayBlockingQueue的性能比較。
X軸顯示的是延遲時間,Y軸是操作次數。可以看到Disruptor的延遲小,吞吐率高。
Disruptor有多種使用模型和配置,官方的一些模型的測試結果的鏈接在 這里 。
我想做的其實就是go-disruptor和官方的Java Disruptor的性能比較。因為Disruptor有多種配置方式,單生產者和多生產者,單消費者和多消費者,配置的不同性能差別還是蠻大的,所以公平地講,兩者的比較應該使用相同的配置,盡管它們是由不同的編程語言開發的。
我選取的一個測試方案是:3個生產者和一個消費者,如果使用一個生產者Java Disruptor的性能會成倍的提升。
Java Disruptor
Java的測試主類如下:
publicclassMain{
privatestaticfinalintNUM_PUBLISHERS =3;//Runtime.getRuntime().availableProcessors();
privatestaticfinalintBUFFER_SIZE =1024*64;
privatestaticfinallongITERATIONS =1000L *1000L *20L;
privatefinalExecutorService executor = Executors.newFixedThreadPool(NUM_PUBLISHERS +1, DaemonThreadFactory.INSTANCE);
privatefinalCyclicBarrier cyclicBarrier =newCyclicBarrier(NUM_PUBLISHERS +1);
privatefinalRingBuffer<ValueEvent> ringBuffer = createMultiProducer(ValueEvent.EVENT_FACTORY, BUFFER_SIZE,newBusySpinWaitStrategy());
privatefinalSequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
privatefinalValueAdditionEventHandler handler =newValueAdditionEventHandler();
privatefinalBatchEventProcessor<ValueEvent> batchEventProcessor =newBatchEventProcessor<>(ringBuffer, sequenceBarrier, handler);
privatefinalValueBatchPublisher[] valuePublishers =newValueBatchPublisher[NUM_PUBLISHERS];
{
for(inti =0; i < NUM_PUBLISHERS; i++)
{
valuePublishers[i] = newValueBatchPublisher(cyclicBarrier, ringBuffer, ITERATIONS / NUM_PUBLISHERS,16);
}
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
}
publiclongrunDisruptorPass()throwsException
{
finalCountDownLatch latch =newCountDownLatch(1);
handler.reset(latch, batchEventProcessor.getSequence().get() + ((ITERATIONS / NUM_PUBLISHERS) * NUM_PUBLISHERS));
Future<?>[] futures = newFuture[NUM_PUBLISHERS];
for(inti =0; i < NUM_PUBLISHERS; i++)
{
futures[i] = executor.submit(valuePublishers[i]);
}
executor.submit(batchEventProcessor);
longstart = System.currentTimeMillis();
cyclicBarrier.await(); //start test
for(inti =0; i < NUM_PUBLISHERS; i++)
{
futures[i].get();
} //all published
latch.await(); //all handled
longopsPerSecond = (ITERATIONS *1000L) / (System.currentTimeMillis() - start);
batchEventProcessor.halt();
returnopsPerSecond;
}
publicstaticvoidmain(String[] args)throwsException
{
Main m = newMain();
System.out.println("opsPerSecond:"+ m.runDisruptorPass());
}
}
生產者和消費者類如下:
publicfinalclassValueAdditionEventHandlerimplementsEventHandler<ValueEvent>
{
privatelongvalue =0;
privatelongcount;
privateCountDownLatch latch;
publiclonggetValue()
{
returnvalue;
}
publicvoidreset(finalCountDownLatch latch,finallongexpectedCount)
{
value = 0;
this.latch = latch;
count = expectedCount;
}
@Override
publicvoidonEvent(finalValueEvent event,finallongsequence,finalbooleanendOfBatch)throwsException
{
value = event.getValue();
if(count == sequence)
{
latch.countDown();
}
}
}
publicfinalclassValueBatchPublisherimplementsRunnable
{
privatefinalCyclicBarrier cyclicBarrier;
privatefinalRingBuffer<ValueEvent> ringBuffer;
privatefinallongiterations;
privatefinalintbatchSize;
publicValueBatchPublisher(
finalCyclicBarrier cyclicBarrier,
finalRingBuffer<ValueEvent> ringBuffer,
finallongiterations,
finalintbatchSize)
{
this.cyclicBarrier = cyclicBarrier;
this.ringBuffer = ringBuffer;
this.iterations = iterations;
this.batchSize = batchSize;
}
@Override
publicvoidrun()
{
try
{
cyclicBarrier.await();
for(longi =0; i < iterations; i += batchSize)
{
longhi = ringBuffer.next(batchSize);
longlo = hi - (batchSize -1);
for(longl = lo; l <= hi; l++)
{
ValueEvent event = ringBuffer.get(l);
event.setValue(l);
}
ringBuffer.publish(lo, hi);
}
}
catch(Exception ex)
{
thrownewRuntimeException(ex);
}
}
}
publicfinalclassValueEvent
{
privatelongvalue;
publiclonggetValue()
{
returnvalue;
}
publicvoidsetValue(finallongvalue)
{
this.value = value;
}
publicstaticfinalEventFactory<ValueEvent> EVENT_FACTORY =newEventFactory<ValueEvent>()
{
publicValueEventnewInstance()
{
returnnewValueEvent();
}
};
}
生產者使用三個線程去寫數據,一個消費者進行處理。生產者運行在三個線程中,批處理寫入,每次寫16個數據。
實際測試每秒能達到 183486238 的吞吐率, 也就是1.8億的吞吐率。
go-disruptor
下面看看go-disruptor的性能能達到多少。
我們知道,Go語言內置的goroutine之間的消息傳遞是通過channel實現的,go-disruptor官方網站上比較了go-disruptor和channel的性能,明顯go-disruptor要比channel要好:
cenario | Per Operation Time |
---|---|
Channels: Buffered, Blocking, GOMAXPROCS=1 | 58.6 ns |
Channels: Buffered, Blocking, GOMAXPROCS=2 | 86.6 ns |
Channels: Buffered, Blocking, GOMAXPROCS=3, Contended Write | 194 ns |
Channels: Buffered, Non-blocking, GOMAXPROCS=1 | 26.4 ns |
Channels: Buffered, Non-blocking, GOMAXPROCS=2 | 29.2 ns |
Channels: Buffered, Non-blocking, GOMAXPROCS=3, Contended Write | 110 ns |
Disruptor: Writer, Reserve One | 4.3 ns |
Disruptor: Writer, Reserve Many | 1.0 ns |
Disruptor: Writer, Reserve One, Multiple Readers | 4.5 ns |
Disruptor: Writer, Reserve Many, Multiple Readers | 0.9 ns |
Disruptor: Writer, Await One | 3.0 ns |
Disruptor: Writer, Await Many | 0.7 ns |
Disruptor: SharedWriter, Reserve One | 13.6 ns |
Disruptor: SharedWriter, Reserve Many | 2.5 ns |
Disruptor: SharedWriter, Reserve One, Contended Write | 56.9 ns |
Disruptor: SharedWriter, Reserve Many, Contended Write | 3.1 ns |
在與Java Disruptor相同的測試條件下go-disruptor的性能呢?
下面是測試代碼:
packagemain
import(
"fmt"
"runtime"
"sync"
"time"
disruptor "github.com/smartystreets/go-disruptor"
)
const(
RingBufferSize =1024*64
RingBufferMask = RingBufferSize -1
ReserveOne =1
ReserveMany =16
ReserveManyDelta = ReserveMany -1
DisruptorCleanup = time.Millisecond *10
)
varringBuffer = [RingBufferSize]int64{}
funcmain() {
NumPublishers :=3//runtime.NumCPU()
totalIterations := int64(1000*1000*20)
iterations := totalIterations / int64(NumPublishers)
totalIterations = iterations * int64(NumPublishers)
fmt.Printf("Total: %d, Iterations: %d, Publisher: %d, Consumer: 1\n", totalIterations, iterations, NumPublishers)
runtime.GOMAXPROCS(NumPublishers)
varconsumer = &countConsumer{TotalIterations: totalIterations, Count:0}
consumer.WG.Add(1)
controller := disruptor.Configure(RingBufferSize).WithConsumerGroup(consumer).BuildShared()
controller.Start()
defercontroller.Stop()
varwg sync.WaitGroup
wg.Add(NumPublishers +1)
varsendWG sync.WaitGroup
sendWG.Add(NumPublishers)
fori :=0; i < NumPublishers; i++ {
gofunc() {
writer := controller.Writer()
wg.Done()
wg.Wait()
current := disruptor.InitialSequenceValue
forcurrent < totalIterations {
current = writer.Reserve(ReserveMany)
forj := current - ReserveMany; j <= current; j++ {
ringBuffer[j&RingBufferMask] = j
}
writer.Commit(current-ReserveMany, current)
}
sendWG.Done()
}()
}
wg.Done()
t := time.Now().UnixNano()
wg.Wait() //waiting for ready as a barrier
fmt.Println("start to publish")
sendWG.Wait()
fmt.Println("Finished to publish")
consumer.WG.Wait()
fmt.Println("Finished to consume")//waiting for consumer
t = (time.Now().UnixNano() - t) /1000000//ms
fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)
}
typecountConsumerstruct{
Count int64
TotalIterations int64
WG sync.WaitGroup
}
func(cc *countConsumer) Consume(lower, upperint64) {
forlower <= upper {
message := ringBuffer[lower&RingBufferMask]
ifmessage != lower {
warning := fmt.Sprintf("\nRace condition--Sequence: %d, Message: %d\n", lower, message)
fmt.Printf(warning)
panic(warning)
}
lower++
cc.Count++
//fmt.Printf("count: %d, message: %d\n", cc.Count-1, message)
ifcc.Count == cc.TotalIterations {
cc.WG.Done()
return
}
}
}
實際測試go-disruptor的每秒的吞吐率達到 137931020 。
好了,至少我們在相同的測試case情況下得到了兩組數據,另外我還做了相同case情況的go channel的測試,所以一共三組數據:
- Java Disruptor : 183486238 ops/s
- go-disruptor : 137931020 ops/s
- go channel : 6995452 ops/s
可以看到go-disruptor的性能要略微低于Java Disruptor,但是也已經足夠高了,達到1.4億/秒,所以它還是值的我們關注的。go channel的性能遠遠不如前兩者。
Go Channel
如果通過Go Channel實現,每秒的吞吐率為 6995452。
代碼如下:
funcmain() {
NumPublishers :=3//runtime.NumCPU()
totalIterations := int64(1000*1000*20)
iterations := totalIterations / int64(NumPublishers)
totalIterations = iterations * int64(NumPublishers)
channel := make(chanint64,1024*64)
varwg sync.WaitGroup
wg.Add(NumPublishers +1)
varreaderWG sync.WaitGroup
readerWG.Add(1)
fori :=0; i < NumPublishers; i++ {
gofunc() {
wg.Done()
wg.Wait()
fori :=int64(0); i < iterations; {
select{
casechannel <- i:
i++
default:
continue
}
}
}()
}
gofunc() {
fori :=int64(0); i < totalIterations; i++ {
select{
casemsg := <-channel:
ifNumPublishers ==1&& msg != i {
//panic("Out of sequence")
}
default:
continue
}
}
readerWG.Done()
}()
wg.Done()
t := time.Now().UnixNano()
wg.Wait()
readerWG.Wait()
t = (time.Now().UnixNano() - t) /1000000//ms
fmt.Printf("opsPerSecond: %d\n", totalIterations*1000/t)
}
來自:http://colobu.com/2016/07/22/using-go-disruptor/