zookeeper client分析
1)幾個重要概念
ZooKeeper:客戶端入口
Watcher:客戶端注冊的callback
ZooKeeper.SendThread: IO線程
ZooKeeper.EventThread: 事件處理線程,處理各類消息callback
ClientCnxnSocketNIO:繼承自ClientCnxnSocket,專門處理IO
</ul>
應用提供watch實例
實例化zookeeper
實例化socket,默認使用ClientCnxnSocketNIO,可通過zookeeper.clientCnxnSocket配置定制
實例化ClientCnxn
實例化SendThread
實例化EventThread
</ul>
啟動zookeeper
啟動SendThread
連接服務器(見SendThread.startConnect)
產生真正的socket,見ClientCnxnSocketNIO.createSock
向select注冊一個OP_CONNECT事件并連接服務器,由于是非阻塞連接,此時有可能并不會立即連上,如果連上就會調用SendThread.primeConnection初始化連接來注冊讀寫事件,否則會在接下來的輪詢select獲取連接事件中處理
復位socket的incomingBuffer
</ul>
連接成功后會產生一個connect型的請求發給服務,用于獲取本次連接的sessionid
進入循環等待來自應用的請求,如果沒有就根據時間來ping 服務器
</ul>
啟動EventThread
開始進入無限循環,從隊列waitingEvents中獲取事件,如果沒有就阻塞等待
客戶端線程
構造一個exists類型的請求,請求類型見ZooDefs.OpCode
將請求構造成一個Packet,并將該packet放入outgoingQueue
喚醒select
阻塞等待結果
</ul>
SendThread
通過select 輪詢判斷是否有socket準備好,如果能讀就讀,能寫就寫
此時socket準備好寫了 ,就從outgoingQueue獲取packet, 將packet發送到服務端
一旦發送了一個完整的packet,就將packet從outgoingQueue移除
最后將packet加入到pendingQueue
再次select輪詢看是否有響應數據,如果有首先都去4個字節的響應頭(包含響應的長度信息),然后在下一次遍歷中都去響應體
都到響應將packet從pendingQueue移除
如果該請求packet帶有一個callback,那么會將此packet放入waitingEvents隊列,讓EventThread去處理
最后會調用p.notifyAll()解鎖,于是應用線程從阻塞中出來
</ul>
如果使用了帶callback 的exists,EventThread會干活
</ul>
now :每次輪詢select之前更新,或者發生錯誤是在catch段中更新為當前時間
lastHeard:在讀取了響應,包括上面提到的connect型請求和常規命令型請求的響應以及完成網絡連接時更新為當前時間
lastSend:每次發送完ping 命令和請求以及完成網絡連接時更新為當前時間
</ul>
sessionTimeout:zookeeper初始化時設置的
readTimeout:sessionTimeout * 2 / 3
connectTimeout:sessionTimeout / hostProvider.size(); //hostProvider.size()為zookeeper服務器個數
getIdleRecv():now - lastHeard
getIdleSend():now - lastSend
SessionTimeout的計算
如果沒有完成連接to=connectTimeout - getIdleRecv()
如果完成連接to=readTimeout - getIdleRecv()
如果to<=0 就會拋出SessionTimeoutException
</ul>
</ul>
計算select timeout(上面提到的to)
檢查空閑時間,有可能拋出SessionTimeoutException或者發送ping
使用select輪詢,獲取網絡事件(連接、讀、寫)也就是這3類
如果是連接,做連接處理
如果讀,過程如下
讀取消息頭,4個字節,頭包含了消息體的字節數
讀 取消息體,分為兩個大類消息,連接型消息“connect”和非連接型消息“header”,前者上面提到過就是連接完成之后發的一種消息,用于確定 sessionid, 另外前者會調用sendThread.onConnected,后者會調用sendThread.readResponse
非連接型消息有分為幾類
ping 消息
auth認證消息
訂閱的消息,即各種變化的通知,比如子節點變化、節點內容變化,由服務器推過來的消息 ,獲取到這類消息或通過eventThread.queueEvent將消息推入事件隊列
客戶端命令的response,如果此消息帶有callback著通過eventThread.queuePacket推入事件隊列,否者喚醒阻塞的應用線程,注意到客戶端命令都會有阻塞版本和異步版本(帶callback)
</ul>
</ul>
如果是寫,就從outgoingQueue獲取packet,寫入網絡
</ul>
</ul>
沒有使用傳統連接池,會和zookeeper集群中的一臺相連
單IO線程(NIO)+事件線程,很標準的NIO模式
</ul>
轉自:http://blog.csdn.net/pwlazy/article/details/8000566
2)zookeeper初始化
</ul>
3)以一個請求為例以 zk.exists("/root", false)為例
4)小結
4.1)
SendThread也并非完全對應與請求/響應模式,SendThread也會接受到節點變化的通知,此時客戶端變成了服務端
4.2)時間和超時的控制
ClientCnxnSocket作為ClientCnxnSocketNIO的父類,
有3個關鍵的時間字段
有下面幾個超時設置
4.3)什么時候ping
計算timeToNextPing = readTimeout / 2-getIdleSend()
如果timeToNextPing <= 0,發送ping請求(只是將ping請求放入outgoingQueue,并不發生IO)
4.4)select阻塞多久
如果上述的0<timeToNextPing<to,那么阻塞時長為timeToNextPing,否則為to
如果有寫請求,select會被喚醒
4.5)sendThread的工作原理
該線程作為zookeeper客戶端的核心部分專門負責IO處理
4.6)請求中的Watcher和StatCallback的差別
兩個都是callback,兩者都由EventThread,但后者控制調用線程是否會阻塞等待響應
4.7)IO模型
如圖