SyncSpout:用來構造可交互的、同步的 Storm 拓撲的組件
SyncSpout簡介
SyncSpout是上海華瑞銀行(SHRB)大數據團隊開發的,用來構造可交互的、同步的Storm拓撲的組件。我們在做實時推薦系統中,希望將Storm的并發性和分布式計算能力應用到“請求-響應”范式中, 比如客戶的某次購買行為能夠以消息的形式發送到storm拓撲中,storm在指定時間返回推薦結果,也就是說storm需要具有可交互性。基于這樣的背景,大數據團隊開發了SyncSpout組件, 該組件可以接收客戶端異步的消息,經過Storm拓撲異步計算,在指定時間內返回給客戶端。
架構圖
關鍵組件介紹
- SyncSpout:繼承storm的IRichSpout,用于接收客戶端調用消息并將消息emit出去的Spout
- SendBolt:拓撲中發送計算結果的bolt,該bolt將計算結果返回給客戶端
- SyncSpoutClient:用于向SyncSpout發送同步消息,并在指定時間內獲取結果
特性
- 使普通的storm應用可交互
- storm應用重啟后,客戶端可自動重連
- 對storm應用幾乎沒有侵入,對業務沒有侵入
- storm集群返回的計算結果能夠準確的返回給指定客戶端的某次調用
- 客戶端可發送任意類型的消息給storm應用;storm應用可返回任意類型的消息給客戶端
- 客戶端可在指定時間內同步獲取storm應用返回的計算結果
- 支持高并發,在單機環境下1000并發量基本在100毫秒內返回
與Storm官方DRPC的異同
- 都能接收一個遠程請求,發送請求到storm拓撲,從storm拓撲接收結果,發送結果回等待的客戶端
- DRPC只能處理字符串;SyncSpout可以處理任意可序列化的類型
- DRPC僅能處理“線性的”DRPC拓撲,計算以一連串步驟的形式表達;SyncSpout能夠處理任意類型的storm拓撲
- DRPC的功能被移植到了Trident中,從原生Storm被廢棄了;SyncSpout會被SHRB一直維護
用法
客戶端
// 創建客戶端
val client = new SyncSpoutClient(topName)
// 初始化
client.init()
// 向遠程storm集群發送消息,并在1000毫秒內返回,若超時則返回null指針
val syncResult = client.ask(ClientMsg("這是發送的消息,可以是任意類型"),1000).asInstanceOf[String]
println(s"返回消息是[$syncResult],可以是任意類型")
storm集群
val builder = new TopologyBuilder()
// ActorSpout用于接收消息
builder.setSpout("syncSpout",SyncSpout(),2)
// SimpleBolt用于處理消息
builder.setBolt("simpleBolt",new SimpleBolt(),2).setNumTasks(4).shuffleGrouping("syncSpout")
// SendBolt用于返回消息
builder.setBolt("sendBolt",new SendBolt(),2).shuffleGrouping("simpleBolt")
val cluster = new LocalCluster()
val topName = "SyncSpoutTop"
val conf = new Config()
conf.setNumWorkers(2)
cluster.submitTopology(topName,conf,builder.createTopology())
println( "SyncSpout 啟動成功!" )
注意點
- 客戶端實例化時的topName就是storm集群中的名稱
- sync-spout-server.conf、sync-spout-client.conf中需要配置zookeeper的host列表
引用第三方類庫
- zkclient: https://github.com/yuluows/zkclient.git
- MPSC隊列:參考akka_2.11-2.4.11的AbstractBoundedNodeQueue類
本文由用戶 LidDarley 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!