Kafka Zero-Copy 使用分析
來自: http://www.jianshu.com/p/d47de3d6d8ac
之前有聽過Zero-Copy 技術,而Kafka是典型的使用者。網上找了找,竟然沒有找到合適的介紹文章。正好這段時間正在閱讀Kafka的相關代碼,于是有了這篇內容。這篇文章會簡要介紹Zero-Copy技術在Kafka的使用情況,希望能給大家一定借鑒和學習樣例。
</div>
前言
Kafka 我個人感覺是性能優化的典范。而且使用Scala開發,代碼寫的也很漂亮的。重點我覺得有四個
- NIO
- Zero Copy
- 磁盤順序讀寫
- Queue數據結構的極致使用
Zero-Copy 實際的原理,大家還是去Google下。這篇文章重點會分析這項技術是怎么被嵌入到Kafa里的。包含兩部分:
- Kafka在什么場景下用了這個技術
- Zero-Copy 是如何被調用,并且發揮作用的。
Kafka在什么場景下使用該技術
答案是:
消息消費的時候
包括外部Consumer以及Follower 從partiton Leader同步數據,都是如此。簡單描述就是:
Consumer從Broker獲取文件數據的時候,直接通過下面的方法進行channel到channel的數據傳輸。
java.nio.FileChannel.transferTo( long position, long count, WritableByteChannel target)`
也就是說你的數據源是一個Channel,數據接收端也是一個Channel(SocketChannel),則通過該方式進行數據傳輸,是直接在內核態進行的,避免拷貝數據導致的內核態和用戶態的多次切換。
Kafka 如何使用Zero-Copy流程分析
估計看完這段內容,你對整個Kafka的數據處理流程也差不多了解了個大概。為了避免過于繁雜,以至于將整個Kafka的體系都拖進來,我們起始點從KafkaApis相關的類開始。
數據的生成
對應的類名稱為:
kaka.server.KafkaApis
該類是負責真正的Kafka業務邏輯處理的。在此之前的,譬如 SocketServer等類似Tomcat服務器一樣,側重于交互,屬于框架層次的東西。KafkaApis 則類似于部署在Tomcat里的應用。
def handle(request: RequestChannel.Request) { ApiKeys.forId(request.requestId) match { case ApiKeys.PRODUCE => handleProducerRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) .....
handle 方法是所有處理的入口,然后根據請求的不同,有不同的處理邏輯。這里我們關注 ApiKeys.FETCH 這塊,也就是有消費者要獲取數據的邏輯。進入 handleFetchRequest 方法,你會看到最后一行代碼如下:
replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.replicaId, fetchRequest.minBytes, authorizedRequestInfo, sendResponseCallback)
ReplicaManager 包含所有主題的所有partition消息。大部分針對Partition的操作都是通過該類來完成的。
replicaManager.fetchMessages 這個方法非常的長。我們只關注一句代碼:
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
該方法獲取本地日志信息數據。內部會調用 kafka.cluster.Log 對象的read方法:
log.read(offset, fetchSize, maxOffsetOpt)
Log 對象是啥呢?其實就是對應的一個Topic的Partition. 一個Partition是由很多端(Segment)組成的,這和Lucene非常相似。一個Segment就是一個文件。實際的數據自然是從這里讀到的。代碼如下:
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition)
這里的fetchInfo(FetchDataInfo)對象包含兩個字段:
- offsetMetadata
- FileMessageSet
FileMessageSet 其實就是用戶在這個Partition這一次消費能夠拿到的數據集合。當然,真實的數據還躺在byteBuffer里,并沒有記在到內存中。FileMessageSet 里面包含了一個很重要的方法:
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = { ......val bytesTransferred = (destChannel match { case tl: TransportLayer => tl.transferFrom(channel, position, count) case dc => channel.transferTo(position, count, dc) }).toInt bytesTransferred
}</pre>
這里我們看到了久違的 transferFrom 方法。那么這個方法什么時候被調用呢?我們先擱置下,因為那個是另外一個流程。我們繼續分析上面的代碼。也就是接著從這段代碼開始分析:
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)獲取到這個信息后,會執行如下操作:
val fetchPartitionData = logReadResults.mapValues(result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet)) responseCallback(fetchPartitionData)logReadResults 的信息被包裝成 FetchResponsePartitionData , FetchResponsePartitionData 包喊了我們的FileMessageSet 對象。還記得么,這個對象包含了我們要跟蹤的 tranferTo方法 。然后FetchResponsePartitionData 會給responseCallback作為參數進行回調。
responseCallback 的函數簽名如下(我去掉了一些我們不關心的信息):
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatusdef fetchResponseCallback(delayTimeMs: Int) { val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } }</pre>
我們重點關注這個回調方法里的 fetchResponseCallback 。 我們會發現這里 FetchResponsePartitionData 會被封裝成一個 FetchResponseSend ,然后由 requestChannel 發送出去。
因為Kafka完全應用是NIO的異步機制,所以到這里,我們無法再跟進去了,需要從另外一部分開始分析。
數據的發送
前面只是涉及到數據的獲取。讀取日志,并且獲得對應MessageSet對象。MessageSet 是一段數據的集合,但是該數據沒有真實的被加載。這里會涉及到Kafka 如何將數據發送回Consumer端。
在SocketServer,也就是負責和所有的消費者打交道,建立連接的中樞里,會不斷的進行poll操作
override def run() { startupComplete() while(isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses()首先會注冊新的連接,如果有的話。接著就是處理新的響應了。還記得剛剛上面我們通過 requestChannel 把 FetchResponseSend 發出來吧。
private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { try { curr.responseAction match {
case RequestChannel.SendAction => selector.send(curr.responseSend) inflightResponses += (curr.request.connectionId -> curr)} } finally { curr = requestChannel.receiveResponse(id) } }
}</pre>
這里類似的,processNewResponses方法會先通過 send 方法把FetchResponseSend注冊到selector上。 這個操作其實做的事情如下:
//SocketServer.scala
public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); channel.setSend(send); }//KafkaChannel.scala public void setSend(Send send) { this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}</pre>為了方便看代碼,我對代碼做了改寫。我們看到,其實send就是做了一個WRITE時間注冊。這個是和NIO機制相關的。如果大家看的有障礙,不妨先學習下相關的機制。
回到 SocketServer 的 run 方法里,也就是上面已經貼過的代碼:
override def run() { startupComplete() while(isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses()try { selector.poll(300) } catch { case... }</pre>
SocketServer 會poll隊列,一旦對應的KafkaChannel 寫操作ready了,就會調用KafkaChannel的write方法:
//KafkaChannel.scala public Send write() throws IOException { if (send != null && send(send)) } // //KafkaChannel.scala private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE);return send.completed(); }</pre>
依然的,為了減少代碼,我做了些調整,其中write會調用 send方法,對應的Send對象其實就是上面我們注冊的 FetchResponseSend 對象。
這段代碼里真實發送數據的代碼是 send.writeTo(transportLayer); ,
對應的writeTo方法為:
private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { case(topic, data) => new TopicDataSend(dest, TopicData(topic, data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)})) })) override def writeTo(channel: GatheringByteChannel): Long = { ..... written += sends.writeTo(channel) .... }這里我依然做了代碼簡化,只讓我們關注核心的。 這里最后是調用了 sends 的writeTo方法,而sends 其實是個 MultiSend 。
這個MultiSend 里有兩個東西:
</div>
- topicAndPartition.partition: 分區
- message:FetchResponsePartitionData
還記得這個FetchResponsePartitionData 么?我們的MessageSet 就被放在了FetchResponsePartitionData這個對象里。
TopicDataSend 也包含了sends,該sends 包含了 PartitionDataSend,而 PartitionDataSend則包含了FetchResponsePartitionData。
最后進行writeTo的時候,其實是調用了
//partitionData 就是 FetchResponsePartitionData //messages 其實就是FileMessageSet val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)如果你還記得的話,FileMessageSet 也有個writeTo方法,就是我們之前已經提到過的那段代碼:
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = { ......val bytesTransferred = (destChannel match { case tl: TransportLayer => tl.transferFrom(channel, position, count) case dc => channel.transferTo(position, count, dc) }).toInt bytesTransferred
}</pre>
終于走到最底層了,最后其實是通過tl.transferFrom(channel, position, count) 來完成最后的數據發送的。這里你可能比較好奇,不應該是調用 transferTo 方法么? transferFrom 其實是Kafka自己封裝的一個方法,最終里面調用的也是transerTo:
@Override public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException { return fileChannel.transferTo(position, count, socketChannel); }總結
Kafka的整個調用棧還是非常繞的。尤其是引入了NIO的事件機制,有點類似Shuffle,把流程調用給切斷了,無法簡單通過代碼引用來進行跟蹤。Kafka還有一個非常優秀的機制就是DelayQueue機制,我們在分析的過程中,為了方便,把這塊完全給抹掉了。
</div>本文由用戶 DanHelmick 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!