Java NIO的通訊組件gecko框架概述

lieee 9年前發布 | 20K 次閱讀 gecko Java開發

1 gecko概述

最近在研究metaq消息隊列,它里面用到的NIO通信框架是gecko,文檔是這么描述的

Gecko是一個Java NIO的通訊組件,它在一個輕量級的NIO框架的基礎上提供了更高層次的封裝和功能。

支持的RPC調用方式包括RR(request-response)和pipeline。
  • 0 可插拔的協議設計
  • 1 連接池
  • 2 分組管理和負載均衡
  • 3 Failover/Retry
  • 4 重連管理
  • 5 同步和異步調用

本文就按照日常NIO通信框架和RPC所面臨的問題來看下gecko是怎么解決和實現的

2 gecko實現NIO通信框架

2.1 NIO類庫的使用

像Netty、Mina這種NIO通信框架都是不使用Jdk自帶的NIO類庫,自己重寫NIO類庫。而gecko則是使用的是jdk自帶的NIO類庫,具體的差別,我現在還不太了解

2.2 線程模型的選擇

先簡單描述下NIO形式下的線程模型,具體參考這篇文章infoq:Netty系列之Netty線程模型

  • 傳統BIO模型

傳統BIO模型

比較簡單,每來一個連接就創建一個線程來處理。這時候最大線程數就成了連接數的限制。

  • Reactor單線程模型

Java NIO的通訊組件gecko框架概述

使用一個線程,使用Selector來監聽連接的建立和連接的讀寫事件,然后仍然是該線程去完成事件的處理。

  • Reactor多線程模型

Java NIO的通訊組件gecko框架概述

使用一個主線程,該線程的Selector只負責接收連接的建立,建立連接后將該連接的讀寫事件注冊到其他線程的Selector,目前大部分都是采用這種方式。

下面再舉幾個例子:ZooKeeper通信使用的線程模型、Netty、Gecko

2.2.1 ZooKeeper通信使用的線程模型

ZooKeeper通信使用的線程模型就比較簡單,默認使用上述Reacter單線程模型。并且是采用jdk自帶的NIO類庫來實現的。

服務器端開啟一個線程,創建出一個Selector,在該線程中不斷的檢測連接的建立和讀寫事件。代碼大致如下

ZooKeeper NIO通訊

ZooKeeper NIO通訊

一旦有客戶端建立連接,則獲取SocketChannel,并設置非阻塞,注冊到seclector上,綁定到SelectionKey

一旦是讀寫事件則從SelectionKey中取出進行數據的讀寫

上述事件都是由一個線程來完成的,要求不高的情況下就可以滿足了。如大部分框架使用ZooKeeper的場景都是作為一個協調服務,訪問量很小的。 如果ZooKeeper承載了很多很多的服務,上述單線程通信方式滿足不了了,就需要使用Reactor多線程模型,如采用netty作為NIO通訊框 架。

2.2.2 Netty采用的線程模型

可以簡單就說成采用Reactor多線程模型。會有一個boss線程池和worker線程池。boss線程池上的Selector專門負責連接的建 立,一旦連接建立起來之后,從worker線程池上獲取一個線程(獲取過程實現簡單的負載均衡),將連接注冊到該線程上的Selector上,讓 worker線程池負責讀寫操作,具體如下:

  • 1 每一個NioEventLoop:包含一個線程和一個Selector selector復用器

    因此一個NioEventLoop就可以處理很多鏈路,每個鏈路的讀寫操作全部交給這一個線程來處理,避免了并發操作同一個鏈路的可能性

    Java NIO的通訊組件gecko框架概述

  • 2 每當有一個新的客戶端接入,則從NioEventLoop線程組中順序獲取一個可用的NioEventLoop,當到達數組上限之后,重新返回到0,通 過這種方式,可以基本保證各個NioEventLoop的負載均衡。一個客戶端連接只注冊到一個NioEventLoop上,這樣就避免了多個IO線程去 并發操作它

2.2.3 Gecko采用的線程模型

也是采用的是Reactor多線程模型。有一個SelectorManager,它含一個Reactor[] reactorSet,每一個Reactor都是一個線程,并且擁有自己的Selector,和上述netty的NioEventLoop差不多。

SelectorManager將第一個Reactor專門用作接收客戶端連接的作用,連接建立起來之后,將連接注冊到其余的Reactor的Selector上。 在選擇Reactor也是采用簡單的順序輪訓的策略。

SelectorManager代碼如下:

SelectorManager

選擇Reactor代碼如下:

在這里輸入圖片標題

上述netty中,有關某個連接的讀寫事件全部交給了該連接注冊的worker線程上,對某個連接的操作只會在同一個線程中進行,避免了并發操作,但是每個線程必須完成當前的讀寫操作后才能去執行接下來的另外的讀寫事件。

而Gecko則是將讀寫事件交給了專門的讀寫線程池,這樣的話Reactor線程只管發出讀寫任務(一個Runnable對象),真正的讀寫操作交 給讀寫線程池來完成,Reactor線程處理事件的并發量就大了,但是這樣的話就是就可能出現多個IO線程并發操作同一個連接,加大了出錯的風險。

2.3 編解碼和序列化反序列化的處理

這一部分就需要處理粘包問題、采用的協議。不同的協議就要使用不同編解碼器,所以文章開頭所說的可插拔的協議設計就是指用戶可以自己指定自己協議所使用的編解碼器

就來詳細看下處理粘包的過程:

  • 1 從SocketChannel中取出全部的可讀數據,將數據寫到一個buffer中,如果buffer承載不下,暫時就不讀了,先處理buffer中的數據,處理完成之后再來寫剩余的數據到buffer中

  • 2 將buffer執行flip操作,由寫狀態進入讀狀態

  • 3 buffer中有很多數據,可能是客戶端發送過來的多條消息數據,而目前采用的大部分協議都是固定長度的Header加Body的形式,Header中指明了Body的長度。這樣的話,肯能就存在幾種情況:

    • 3.1 buffer中可讀長度不足Header的固定長度,則此數據還沒到齊,暫時還不能解析,需要直接返回,不做處理
    • 3.2 buffer中可讀長度大于一個Header的固定長度,則可以進行讀取解析,獲取Header中每一位定義的信息(這就是所采用的協議定義的),如dubbo協議中Header中的每一位信息

    Java NIO的通訊組件gecko框架概述

    • 3.3 Header中讀取完畢,就知道了Body的長度,還需要判斷下buffer中剩余可讀數據的長度是否大于剛才解析出的Header中給出的Body的長度,如果不足的話,此時有兩種方式:

      一種就是回溯所讀取的數據,在解析Header之前,先保留當前的index,解析header之后,發現buffer中剩余長度不足則將buffer重置讀取位置到上述index。

      另一種就是將解析出來的Header信息先綁定到連接上,每次解析數據前,先從連接中獲取Header信息,如果有,說明上次已經解析完Header信息了,但是Body中信息不足,那就直接開始解析Body了

      第一種方式:一旦出現Body信息不足,就回溯了,當信息充足后,會重復解析Header中的內容。第二種方式則不會進行重復解析

      而dubbo中處理方式就是采用的第一種,保留一個index。一旦消息不完整則回退到該index。

      目前Gecko就是采用的第二種方式,將Header先暫時保存在連接中

來詳細看下dubbo中的處理方式:

dubbo中對粘包消息的處理

再詳細看下Gecko對粘包的處理方式:

Gecko對粘包的處理方式

buffer中多條消息的處理:

buffer中多條消息的處理

來看下一個解碼器的實現,啟動不同場景下的Server使用不同的編解碼器,如metaq命令行方式就是使用如下編解碼器,專門用于處理請求命令的:

Gecko中對Header和Body的解析

對于RPC,不僅要進行編解碼,同時還要涉及序列化和發序列化的問題:

dubbo中都是通過url中參數值來定制化編解碼器和序列化反序列化方式,提供了各種豐富多樣的編解碼和序列化反序列化方式,而Gecko實現的 RPC也可以實現定制化。默認方式是使用jdk自帶的序列化和反序列化方式,即ObjectInputStream方式,簡單代碼如下

Gecko RPC解碼過程:

RPC解碼過程

Gecko RPC序列化過程:

Gecko RPC序列化過程

3 后續

重連管理、同步轉異步、異步回調、分組管理和負載均衡之后再說.

來自:http://my.oschina.net/pingpangkuangmo/blog/547938

 本文由用戶 lieee 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!