RabbitMQ架構

mssj0912 6年前發布 | 30K 次閱讀 RabbitMQ 消息系統

RabbitMQ是一個高可用的消息中間件,支持多種協議和集群擴展。并且支持消息持久化和鏡像隊列,適用于對消息可靠性較高的場合,基本模型如下。

其客戶端使用方式

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()

# connections
with Connection('amqp://guest:guest@localhost//') as conn:

    # produce
    producer = conn.Producer(serializer='json')
    producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

    # the declare above, makes sure the video queue is declared
    # so that the messages can be delivered.
    # It's a best practice in Kombu to have both publishers and
    # consumers declare the queue. You can also declare the
    # queue manually using:
    # video_queue(conn).declare()

    # consume
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

示例中的發布端和消費端是同一方,而實際中的使用方式一般有多種場景,topic模式、fanout模式、direct模式和RPC模式。

  1. topic模式,按照設置的路由信息(routing key)將消息路由到一個或者多個消費端,而消息只能由一個消費者消費一次。一個消費者可以設置多個路由信息,可以同時獲取多個消費者發送的消息;
  2. fanout模式,與topic模式唯一的區別是同一消息會發送到訂閱(binding)的多個消費者;
  3. direct模式,一對一模式,實際中比較少用;
  4. RPC模式,結合topic和direct模式,發送消息的同時指定要接受的消息。

RabbitMQ監控樹

為了高可靠,Erlang中實際的工作進程(Erlang進程,并不是系統進程)都有一個監控進程,監控進程負責(一個或多個)工作進程的創建、銷毀和重啟。監控進程和工作進程的關系如圖。

  1. 方塊圖是監控進程;
  2. 圓圈是工作進程;
  3. 方塊中的”1“(one_for_one)和”a“(one_for_all)代表不同的監控策略

one_for_one 監控策略,一個工作進程崩潰,則只重啟崩潰的工作進程。

one_for_all監控策略,一個工作進程崩潰,則銷毀并重啟所有工作進程

在RabbitMQ中還有一種 simple_one_for_one監控策略 ,與 one_for_one監控策略 相同,只不過重啟工作進程時的啟動參數是固定的。RabbitMQ網絡框架也遵循該原則。

RabbitMQ消息架構

當client端鏈接服務器時,RabbitMQ會啟動一系列監控和工作進程來處理網絡連接。

為了降低TCP鏈接數量,多個消費者共享同一個鏈接Connection,但是每個消費者獨享一個管道channel,用consumer_tag標識。consumer_tag在Connection唯一,從1開始累加,當重連接時需要匹配該tag。每個消費者對應獨立的一套rabbit_channel_sup_sup->rabbit_channel_sup->rabbit_channel|rabbit_writer|rabbit_limiter系列進程。

RabbitMQ網絡框架時序圖

client建立鏈接后,RabbitMQ通過tcp_acceptor進程處理accept成功后返回的clientfd。

rabbit_reader從TCP鏈接中讀取數據,然后根據協議回調函數處理客戶端的各種請求。

RabbitMQ消息處理流程

RabbitMQ先驗證權限;然后檢查Exchange是否存在,不存在則創建;檢查消息是否合法以及是否需要confirm等;根據路由信息選擇消費隊列;檢查消費隊列是否存在,有則將消息發送給消息隊列;檢查消費者是否存在,存在則將消息發送給消費者client端。

RabbitMQ會根據不同的消息的不同類型做不同的處理:

  1. 不持久化消息,如果沒有消費者則直接丟掉,不會入消費隊列;如果有,則先入消息隊列,按照入隊順序依次發送給消費者。
  2. 持久化消息,將消息持久化成功后才給發送端發ack,然后再發送給消費者。

(完)

 

來自:https://fanchao01.github.io/blog/2018/02/09/rabbitmq-arch/

 

 本文由用戶 mssj0912 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!