聊聊Redis的訂閱發布

ekfb 8年前發布 | 14K 次閱讀 Redis NoSQL數據庫

來自: http://andrewliu.in/2016/01/30/聊聊Redis的訂閱發布/

本博客采用創作共用版權協議, 要求署名、非商業用途和保持一致. 轉載本博客文章必須也遵循 署名-非商業用途-保持一致 的創作共用協議.

為什么做訂閱分布?

隨著業務復雜, 業務的項目依賴關系增強, 使用消息隊列幫助系統 降低耦合度 .

  • 訂閱分布本身也是一種生產者消費者模式, 訂閱者是消費者, 發布者是生產者.
  • 訂閱發布模式, 發布者發布消息后, 只要有訂閱方, 則多個訂閱方會收到同樣的消息
  • 生產者消費者模式, 生產者往隊列里放入消息, 由多個消費者對一條消息進行搶占.

  • 訂閱分布模式可以將一些不著急完成的工作放到其他進程或者線程中進行離線處理.

Redis中的訂閱發布

Redis中的訂閱發布模式, 當沒有訂閱者時, 消息會被直接丟棄(Redis不會持久化保存消息)

Redis生產者消費者

生產者使用Redis中的list數據結構進行實現, 將待處理的消息塞入到消息隊列中.

classProducer(object):

def__init__(self, host="localhost", port=6379):
 self._conn = redis.StrictRedis(host=host, port=port)
 self.key = "test_key"
 self.value = "test_value_{id}"

defproduce(self):
foridinxrange(5):
 msg = self.value.format(id=id)
 self._conn.lpush(self.key, msg)

消費者使用 redis中brpop 進行實現, brpop會從list頭部消息, 并能夠設置超時等待時間.

classConsumer(object):

def__init__(self, host="localhost", port=6379):
 self._conn = redis.StrictRedis(host=host, port=port)
 self.key = "test_key"

defconsume(self, timeout=0):
# timeout=0 表示會無線阻塞, 直到獲得消息
whileTrue:
 msg = self._conn.brpop(self.key, timeout=timeout)
 process(msg)


defprocess(msg):
printmsg

if__name__ =='__main__':
 consumer = Consumer()
 consumer.consume()
# 輸出結果
('test_key','test_value_1')
('test_key','test_value_2')
('test_key','test_value_3')
('test_key','test_value_4')
('test_key','test_value_5')

Redis中訂閱發布

在Redis Pubsub中, 一個頻道(channel)相當于一個消息隊列

classPublisher(object):

def__init__(self, host, port):
 self._conn = redis.StrictRedis(host=host, port=port)
 self.channel = "test_channel"
 self.value = "test_value_{id}"

defpub(self):
foridinxrange(5):
 msg = self.value.format(id=id)
 self._conn.publish(self.channel, msg)

其中 get_message 使用了 select IO多路復用來檢查socket連接是否是否可讀.

classSubscriber(object):

def__init__(self, host="localhost", port=6379):
 self._conn = redis.StrictRedis(host=host, port=port)
 self._pubsub = self._conn.pubsub() # 生成pubsub對象
 self.channel = "test_channel"
 self._pubsub.subscribe(self.channel)

defsub(self):
whileTrue:
 msg = self._pubsub.get_message()
ifmsgandisinstance(msg.get("data"), basestring):
 process(msg.get("data"))

defclose(self):
 self._pubsub.close()

# 輸出結果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_5

Java Jedis踩過的坑

在Jedis中訂閱方處理是采用同步的方式, 看源碼中 PubSub模塊的process函數

在 do-while 循環中, 會等到當前消息處理完畢才能夠處理下一條消息, 這樣會導致當入隊列消息量過大的時候, redis鏈接被強制關閉.

解決方案: 將整個處理函數改為異步的方式.

 本文由用戶 ekfb 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!