消息中間件RabbitMQ的心跳檢測總結

jopen 10年前發布 | 39K 次閱讀 RabbitMQ 消息系統

heartbeat通常用來檢測通信的對端是否存活(未正常關閉socket連接而異常crash)。其基本原理是檢測對應的socket連接上數據的收發是否正常,如果一段時間內沒有收發數據,則向對端發送一個心跳檢測包,如果一段時間內沒有回應則認為心跳超時,即認為對端可能異常crash了。

rabbitmq也不例外,heatbeat在客戶端和服務端之間用于檢測對端是否正常,即客戶端與服務端之間的tcp鏈接是否正常。

1. heartbeat檢測時間間隔的設置

1). 服務端的設置

heartbeat檢測時間間隔可在配置文件rabbitmq.config中增加配置項{heartbeat,Timeout}進行配置,其中Timeout指定時間間隔,單位為秒。

例如:

消息中間件RabbitMQ的心跳檢測總結

如果沒有進行配置,默認的時間間隔為600秒(最新版本3.2.2修改為580秒)

在rabbit.app中有heartbeat的配置項。

2). 客戶端的設置

根據AMQP協議,rabbitmq會通過connection.tune信令將heartbeat檢測時間間隔告知客戶端,客戶端可以根據需要重新設置該值,并通過Connection.tune-ok信令將時間間隔再告訴給rabbitmq,rabbitmq會以客戶端的時間作為該tcp連接上heartbeat檢測的間隔時間。

消息中間件RabbitMQ的心跳檢測總結

這里要注意的是:如果時間間隔配置為0,則表示不啟用heartbeat檢測。

如果啟用了rabbitmq_management的話,從web控制臺可以看到客戶端的檢測的時間間隔。

消息中間件RabbitMQ的心跳檢測總結

2. heartbeat的實現

rabbitmq在收到來自客戶端的connection.tune-ok信令后,啟用心跳檢測,rabbitmq會為每個tcp連接創建兩個進程用于心跳檢測,一個進程定時檢測tcp連接上是否有數據發送(這里的發送是指rabbitmq發送數據給客戶端),如果一段時間內沒有數據發送給客戶端,則發送一個心跳包給客戶端,然后循環進行下一次檢測;另一個進程定時檢測tcp連接上是否有數據的接收,如果一段時間內沒有收到任何數據,則判定為心跳超時,最終會關閉tcp連接。另外,rabbitmq的流量控制機制可能會暫停heartbeat檢測,這里不展開描述。

涉及的源碼:

start(SupPid, Sock, SendTimeoutSec,
      SendFun, ReceiveTimeoutSec, ReceiveFun) ->
    %%數據發送檢測進程
    {ok, Sender} = start_heartbeater(SendTimeoutSec, SupPid, Sock,
                                     SendFun, heartbeat_sender,
                                     start_heartbeat_sender),
    %%數據接收檢測進程
    {ok, Receiver} = start_heartbeater(ReceiveTimeoutSec, SupPid,
                                       Sock, ReceiveFun,
                                       heartbeat_receiver,
                                       start_heartbeat_receiver),
    {Sender, Receiver}.

start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
    %% the 'div 2' is there so that we don't end up waiting for
    %% nearly 2 * TimeoutSec before sending a heartbeat in the
    %% boundary case
    heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
                 fun () -> SendFun(), continue end}).

start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
    %% we check for incoming data every interval, and time out after
    %% two checks with no change. As a result we will time out
    %% between 2 and 3 intervals after the last data has been
    %% received
    heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
                fun () -> ReceiveFun(), stop end}).

heartbeater({Sock, TimeoutMillisec, 
             StatName, Threshold, Handler} = Params,
            Deb,
            {StatVal, SameCount} = State) ->
    Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end,
    receive
        ...
    %% 定時檢測
    after TimeoutMillisec ->
        case rabbit_net:getstat(Sock, [StatName]) of
            {ok, [{StatName, NewStatVal}]} ->
                %% 收發數據有變化
                if NewStatVal =/= StatVal ->
                       %%重新開始檢測
                       Recurse({NewStatVal, 0});
                   %%未達到指定次數, 發送為0, 接收為1
                   SameCount < Threshold ->
                       %%計數加1, 再次檢測
                       Recurse({NewStatVal, SameCount + 1});
                   %%heartbeat超時
                   true ->
                       %%對于發送檢測超時, 向客戶端發送heartbeat包
                       %%對于接收檢測超時, 向父進程發送超時通知
                       %%由父進程觸發tcp關閉等操作
                       case Handler() of
                           %%接收檢測超時
                           stop     -> ok;
                           %%發送檢測超時
                           continue -> Recurse({NewStatVal, 0})
                       end;
           ...



收發檢測的時候利用了inet模塊的getstat,查看socket的統計信息


recv_oct: 查看socket上接收的字節數

send_oct: 查看socket上發送的字節數

inet詳細見這里: http://www.erlang.org/doc/man/inet.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!