RabbitMQ架構
RabbitMQ是一個高可用的消息中間件,支持多種協議和集群擴展。并且支持消息持久化和鏡像隊列,適用于對消息可靠性較高的場合,基本模型如下。
其客戶端使用方式
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print body
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
示例中的發布端和消費端是同一方,而實際中的使用方式一般有多種場景,topic模式、fanout模式、direct模式和RPC模式。
- topic模式,按照設置的路由信息(routing key)將消息路由到一個或者多個消費端,而消息只能由一個消費者消費一次。一個消費者可以設置多個路由信息,可以同時獲取多個消費者發送的消息;
- fanout模式,與topic模式唯一的區別是同一消息會發送到訂閱(binding)的多個消費者;
- direct模式,一對一模式,實際中比較少用;
- RPC模式,結合topic和direct模式,發送消息的同時指定要接受的消息。
RabbitMQ監控樹
為了高可靠,Erlang中實際的工作進程(Erlang進程,并不是系統進程)都有一個監控進程,監控進程負責(一個或多個)工作進程的創建、銷毀和重啟。監控進程和工作進程的關系如圖。
- 方塊圖是監控進程;
- 圓圈是工作進程;
- 方塊中的”1“(one_for_one)和”a“(one_for_all)代表不同的監控策略
one_for_one 監控策略,一個工作進程崩潰,則只重啟崩潰的工作進程。
one_for_all監控策略,一個工作進程崩潰,則銷毀并重啟所有工作進程
在RabbitMQ中還有一種 simple_one_for_one監控策略 ,與 one_for_one監控策略 相同,只不過重啟工作進程時的啟動參數是固定的。RabbitMQ網絡框架也遵循該原則。
RabbitMQ消息架構
當client端鏈接服務器時,RabbitMQ會啟動一系列監控和工作進程來處理網絡連接。
為了降低TCP鏈接數量,多個消費者共享同一個鏈接Connection,但是每個消費者獨享一個管道channel,用consumer_tag標識。consumer_tag在Connection唯一,從1開始累加,當重連接時需要匹配該tag。每個消費者對應獨立的一套rabbit_channel_sup_sup->rabbit_channel_sup->rabbit_channel|rabbit_writer|rabbit_limiter系列進程。
RabbitMQ網絡框架時序圖
client建立鏈接后,RabbitMQ通過tcp_acceptor進程處理accept成功后返回的clientfd。
rabbit_reader從TCP鏈接中讀取數據,然后根據協議回調函數處理客戶端的各種請求。
RabbitMQ消息處理流程
RabbitMQ先驗證權限;然后檢查Exchange是否存在,不存在則創建;檢查消息是否合法以及是否需要confirm等;根據路由信息選擇消費隊列;檢查消費隊列是否存在,有則將消息發送給消息隊列;檢查消費者是否存在,存在則將消息發送給消費者client端。
RabbitMQ會根據不同的消息的不同類型做不同的處理:
- 不持久化消息,如果沒有消費者則直接丟掉,不會入消費隊列;如果有,則先入消息隊列,按照入隊順序依次發送給消費者。
- 持久化消息,將消息持久化成功后才給發送端發ack,然后再發送給消費者。
(完)
來自:https://fanchao01.github.io/blog/2018/02/09/rabbitmq-arch/