SyncSpout:用來構造可交互的、同步的 Storm 拓撲的組件

LidDarley 7年前發布 | 10K 次閱讀 大數據 分布式/云計算/大數據

SyncSpout簡介

SyncSpout是上海華瑞銀行(SHRB)大數據團隊開發的,用來構造可交互的、同步的Storm拓撲的組件。我們在做實時推薦系統中,希望將Storm的并發性和分布式計算能力應用到“請求-響應”范式中, 比如客戶的某次購買行為能夠以消息的形式發送到storm拓撲中,storm在指定時間返回推薦結果,也就是說storm需要具有可交互性。基于這樣的背景,大數據團隊開發了SyncSpout組件, 該組件可以接收客戶端異步的消息,經過Storm拓撲異步計算,在指定時間內返回給客戶端。

架構圖

關鍵組件介紹

  • SyncSpout:繼承storm的IRichSpout,用于接收客戶端調用消息并將消息emit出去的Spout
  • SendBolt:拓撲中發送計算結果的bolt,該bolt將計算結果返回給客戶端
  • SyncSpoutClient:用于向SyncSpout發送同步消息,并在指定時間內獲取結果

特性

  • 使普通的storm應用可交互
  • storm應用重啟后,客戶端可自動重連
  • 對storm應用幾乎沒有侵入,對業務沒有侵入
  • storm集群返回的計算結果能夠準確的返回給指定客戶端的某次調用
  • 客戶端可發送任意類型的消息給storm應用;storm應用可返回任意類型的消息給客戶端
  • 客戶端可在指定時間內同步獲取storm應用返回的計算結果
  • 支持高并發,在單機環境下1000并發量基本在100毫秒內返回

與Storm官方DRPC的異同

  • 都能接收一個遠程請求,發送請求到storm拓撲,從storm拓撲接收結果,發送結果回等待的客戶端
  • DRPC只能處理字符串;SyncSpout可以處理任意可序列化的類型
  • DRPC僅能處理“線性的”DRPC拓撲,計算以一連串步驟的形式表達;SyncSpout能夠處理任意類型的storm拓撲
  • DRPC的功能被移植到了Trident中,從原生Storm被廢棄了;SyncSpout會被SHRB一直維護

用法

客戶端

// 創建客戶端
val client = new SyncSpoutClient(topName)
// 初始化
client.init()
// 向遠程storm集群發送消息,并在1000毫秒內返回,若超時則返回null指針
val syncResult = client.ask(ClientMsg("這是發送的消息,可以是任意類型"),1000).asInstanceOf[String]
println(s"返回消息是[$syncResult],可以是任意類型")

storm集群

val builder = new TopologyBuilder()
// ActorSpout用于接收消息
builder.setSpout("syncSpout",SyncSpout(),2)
// SimpleBolt用于處理消息
builder.setBolt("simpleBolt",new SimpleBolt(),2).setNumTasks(4).shuffleGrouping("syncSpout")
// SendBolt用于返回消息
builder.setBolt("sendBolt",new SendBolt(),2).shuffleGrouping("simpleBolt")
val cluster = new LocalCluster()
val topName = "SyncSpoutTop"
val conf = new Config()
conf.setNumWorkers(2)
cluster.submitTopology(topName,conf,builder.createTopology())
println( "SyncSpout 啟動成功!" )

注意點

  • 客戶端實例化時的topName就是storm集群中的名稱
  • sync-spout-server.conf、sync-spout-client.conf中需要配置zookeeper的host列表

引用第三方類庫

 

 

 本文由用戶 LidDarley 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!