數據處理平臺架構中的SMACK組合:Spark、Mesos、Akka、Cassandra以及Kafka

來自: http://dockone.io/article/1023

在今天的文章中,我們將著重探討如何利用SMACK(即Spark、Mesos、Akka、Cassandra以及Kafka)堆棧構建可擴展數據處理平臺。雖然這套堆棧僅由數個簡單部分組成,但其能夠實現大量不同系統設計。除了純粹的批量或者流處理機制之外,我們亦可借此實現復雜的Lambda以及Kappa架構。

基于Mesos技術的數人云可以快速部署和運行Spark、Akka、Cassandra以及Kafka,也歡迎大家在數人云上進行體驗和實踐,感受它們強大功能帶來的便利。在本文開始闡述之前,讓我們首先立足于已有生產項目經驗從設計與示例入手進行說明。

</div>

綜述

Spark - 一套高速通用型引擎,用于實現分布式大規模數據處理任務。

Mesos - 集群資源管理系統,能夠立足于分布式應用程序提供行之有效的資源隔離與共享能力。

Akka - 一套用于在JVM之上構建高并發、分布式及彈性消息驅動型應用程序的工具包與運行時。

Cassandra - 一套分布式高可用性數據庫,旨在跨越多座數據中心處理大規模數據。

Kafka -一套高吞吐能力、低延遲、分布式消息收發系統/提交日志方案,旨在處理實時數據供給。

存儲層: Cassandra

Cassandra一直以其高可用性與高吞吐能力兩大特性而備受矚目,其同時能夠處理極為可觀的寫入負載并具備節點故障容錯能力。以CAP原則為基礎,Cassandra能夠為業務運營提供可調整的一致性/可用性水平。

更有趣的是,Cassandra在處理數據時擁有線性可擴展能力(即可通過向集群當中添加節點的方式實現負載增容)并能夠提供跨數據中心復制(簡稱XDCR)能力。事實上,跨數據中心復制功能除了數據復制,同時也能夠實現以下各類擴展用例:

  • 地理分布式數據中心處理面向特定區域或者客戶周邊位置之數據。
  • 在不同數據中心之間者數據遷移,從而實現故障后恢復或者將數據移動至新數據中心。
  • 對運營工作負載與分析工作負載加以拆分。

但上述特性也都有著自己的實現成本,而對于Cassandra而言這種成本體現為數據模型——這意味著我們需要通過聚類對分區鍵及入口進行分組/分類,從而實現嵌套有序映射。以下為簡單示例:

CREATE TABLE campaign(

id uuid,

year int,

month int,

day int,

views bigint,

clicks bigint,

PRIMARY KEY (id, year, month, day)

);

INSERT INTO campaign(id, year, month, day, views, clicks)

VALUES(40b08953-a…,2015, 9, 10, 1000, 42);

SELECT views, clicks FROM campaign

WHERE id=40b08953-a… and year=2015 and month>8; </pre>

為了獲取某一范圍內的特定數據,我們必須指定全鍵,且不允許除列表內最后一列之外的其它任何范圍劃定得以執行。這種限制用于針對不同范圍進行多重掃描限定,否則其可能帶來隨機磁盤訪問并拖慢整體性能表現。這意味著該數據模型必須根據讀取查詢進行認真設計,從而限制讀取/掃描量——但這同時也會導致對新查詢的支持靈活性有所下降。

那么如果我們需要將某些表加入到其它表當中,又該如何處理?讓我們考慮下一種場景:針對特定月份對全部活動進行總體訪問量計算。

CREATE TABLE event(

id uuid,

ad_id uuid,

campaign uuid,

ts bigint,

type text,

PRIMARY KEY(id)

);</pre>

在特定模型之下,實現這一目標的惟一辦法就是讀取全部活動、讀取全部事件、匯總各屬性值(其與活動id相匹配)并將其分配給活動。實現這類應用程序操作顯然極具挑戰,因為保存在Casandra中的數據總量往往非常龐大,內存容量根本不足以加以容納。因此我們必須以分布式方式對此類數據加以處理,而Spark在這類用例中將發揮重要作用。

處理層: Spark

Spark的抽象核心主要涉及RDD(即彈性分布式數據集,一套分布式元素集合)以及由以下四個主要階段構成的工作流:

  • RDD操作(轉換與操作)以DAG(即有向無環圖)形式進行
  • DAG會根據各任務階段進行拆分,并隨后被提交至集群管理器
  • 各階段無需混洗/重新分配即可與任務相結合
  • 任務運行在工作程序之上,而結果隨后返回至客戶端

以下為我們如何利用Spark與Cassandra解決上述問題:

val sc = new SparkContext(conf)

case class Event(id: UUID, ad_id: UUID, campaign: UUID, ts: Long, type: String)

sc.cassandraTableEvent

.filter(e => e.type == "view" && checkMonth(e.ts))

.map(e => (e.campaign, 1))

.reduceByKey( + )

.collect()</pre>

指向Cassandra的交互通過Spark-Cassandra-連接器負責執行,其能夠讓整個流程變得更為直觀且簡便。另有一個非常有趣的選項能夠幫助大家實現對NoSQL存儲內容的交互——SparkSQL,其能夠將SQL語句翻譯成一系列RDD操作。

case class CampaignReport(id: String, views: Long, clicks: Long)

sql("""SELECT campaign.id as id, campaign.views as views,

campaign.clicks as clicks, event.type as type

FROM campaign

JOIN event ON campaign.id = event.campaign

""").rdd

.groupBy(row => row.getAsString)

.map{ case (id, rows) =>

val views = rows.head.getAsLong

val clicks = rows.head.getAsLong

val res = rows.groupBy(row => row.getAsString).mapValues(_.size)

CampaignReport(id, views = views + res("view"), clicks = clicks + res("click"))

}.saveToCassandra(“keyspace”, “campaign_report”)</pre>

通過幾行代碼,我們已經能夠實現原生Lambda設計——其復雜度顯然較高,但這一示例表明大家完全有能力以簡單方式實現既定功能。

類MapReduce解決方案:拉近處理與數據間的距離

Spark-Cassandra連接器擁有數據位置識別能力,并會從集群內距離最近的節點處讀取數據,從而最大程度降低數據在網絡中的傳輸需求。為了充分發揮Spark-C*連接器的數據位置識別能力,大家應當讓Spark工作程序與Cassandra節點并行協作。

除了Spark與Cassandra的協作之外,我們也有理由將運營(或者高寫入強度)集群同分析集群區分開來,從而保證:

  • 不同集群能夠獨立進行規模伸縮
  • 數據由Cassandra負責復制,而無需其它機制介入
  • 分析集群擁有不同的讀取/寫入負載模式
  • 分析集群能夠容納額外數據(例如詞典)與處理結果
  • Spark對資源的影響只局限于單一集群當中

下面讓我們再次回顧Spark的應用程序部署選項:

目前我們擁有三種主要集群資源管理器選項可供選擇:

單獨使用Spark——Spark作為主體,各工作程序以獨立應用程序的形式安裝并執行(這明顯會增加額外資源負擔,且只支持為每工作程序分配靜態資源)

如果大家已經擁有Hadoop生態系統,那么YARN絕對是個不錯的選項

Mesos自誕生之初就在設計中考慮到對集群資源的動態分配,而且除了Hadoop應用程序之外,同時也適合處理各類異構工作負載

Mesos架構

Mesos集群由各主節點構成,它們負責資源供應與調度,而各從節點則實際承擔任務執行負載。在HA模式當中,我們利用多個主ZooKeeper節點負責進行主節點選擇與服務發現。Mesos之上執行的各應用程序被稱為“框架(Framework)”,并利用API處理資源供應及將任務提交至Mesos。總體來講,其任務執行流程由以下幾個步驟構成:

-從節點為主節點提供可用資源

-主節點向框架發送資源供應

-調度程序回應這些任務及每任務資源需求

-主節點將任務發送至從節點

將Spark、Mesos以及Cassandra加以結合

正如之前所提到,Spark工作程序應當與Cassandra節點協作,從而實現數據位置識別能力以降低網絡流量與Cassandra集群負載。下圖所示為利用Mesos實現這一目標的可行部署場景示例:

-Mesos主節點與ZooKeeper協作

-Mesos從節點與Cassandra節點協作,從而為Spark提供更理想的數據位置

-Spark二進制文件部署至全部工作節點當中,而spark-env.sh則配置以合適的主端點及執行器jar位置

park執行器JAR被上傳至S3/HDFS當中

根據以上設置流程Spark任務可利用簡單的spark-submit調用從任意安裝有Spark二進制文件并上傳有包含實際任務邏輯jar的工作節點被提交至集群中。

spark-submit --class io.datastrophic.SparkJob /etc/jobs/spark-jobs.jar  

由于現有選項已經能夠運行Docker化Spark,因此我們不必將二進制文件分發至每個單一集群節點當中。

定期與長期運行任務之執行機制

每套數據處理系統遲早都要面對兩種必不可少的任務運行類別:定期批量匯聚型定期/階段性任務以及以數據流處理為代表的長期任務。這兩類任務的一大主要要求在于容錯能力——各任務必須始終保持運行,即使集群節點發生故障。Mesos提供兩套出色的框架以分別支持這兩種任務類別。

Marathon是一套專門用于實現長期運行任務高容錯性的架構,且支持與ZooKeeper相配合之HA模式。其能夠運行Docker并提供出色的REST API。以下shell命令示例為通過運行spark-submit實現簡單任務配置:

Chronos擁有與Marathon相同的特性,但其設計目標在于運行定期任務,而且總體而言其分布式HA cron支持任務圖譜。以下示例為利用簡單的bash腳本實現S3壓縮任務配置:

目前已經有多種框架方案可供選擇,或者正處于積極開發當中以對接各類系統中所廣泛采用的Mesos資源管理功能。下面列舉其中一部分典型代表:

-Hadoop

-Cassandra

-Kafka

-Myriad: YARN on Mesos

-Storm

-Samza

數據提取

到目前為止可謂一切順利:存儲層已經設計完成,資源管理機制設置妥當,而各任務亦經過配置。接下來惟一要做的就是數據處理工作了。

假定輸入數據將以極高速率涌來,這時端點要順利應對就需要滿足以下要求:

-提供高吞吐能力/低延遲

-具備彈性

-可輕松實現規模擴展

-支持背壓

背壓能力并非必需,不過將其作為選項來應對負載峰值是個不錯的選擇。

Akka能夠完美支持以上要求,而且基本上其設計目標恰好是提供這套功能集。下面來看Akka的特性:

-JVM面向JVM的角色模型實現能力

-基于消息且支持異步架構

-強制執行非共享可變狀態

-可輕松由單一進程擴展至設備集群

-利用自上而下之監督機制實現角色層級

-不僅是并發框架:akka-http、akka-stream以及akka-persistence

以下簡要示例展示了三個負責處理JSON HttpRequest的角色,它們將該請求解析為域模型例類,并將其保存在Cassandra當中:

class HttpActor extends Actor {

def receive = {

case req: HttpRequest =>

system.actorOf(Props[JsonParserActor]) ! req.body

case e: Event =>

system.actorOf(Props[CassandraWriterActor]) ! e

}

}

class JsonParserActor extends Actor {

def receive = {

case s: String => Try(Json.parse(s).as[Event]) match {

case Failure(ex) => //error handling code

case Success(event) => sender ! event

}

}

}

class CassandraWriterActor extends Actor with ActorLogging {

//for demo purposes, session initialized here

val session = Cluster.builder()

.addContactPoint("cassandra.host")

.build()

.connect()

override def receive: Receive = {

case event: Event =>

val statement = new SimpleStatement(event.createQuery)

.setConsistencyLevel(ConsistencyLevel.QUORUM)

Try(session.execute(statement)) match {

case Failure(ex) => //error handling code

case Success => sender ! WriteSuccessfull

}

}</pre>

}

看起來只需幾行代碼即可實現上述目標,不過利用Akka向Cassandra當中寫入原始數據(即事件)卻有可能帶來以下問題:

-Cassandra的設計思路仍然偏重高速交付而非批量處理,因此必須對輸入數據進行預匯聚。

-匯聚/匯總所帶來的計算時間會隨著數據總量的增長而逐步加長。

-由于采用無狀態設計模式,各角色并不適合用于執行匯聚任務。

-微批量機制能夠在一定程度上解決這個難題。

-仍然需要為原始數據提供某種可靠的緩沖機制

Kafka充當輸入數據之緩沖機制

為了保留輸入數據并對其進行預匯聚/處理,我們也可以使用某種類型的分布式提交日志機制。在以下用例中,消費程序將批量讀取數據,對其進行處理并將其以預匯聚形式保存在Cassandra當中。該示例說明了如何利用akka-http通過HTTP將JSON數據發布至Kafka當中:

val config = new ProducerConfig(KafkaConfig())

lazy val producer = new KafkaProducerA, A

val topic = “raw_events”

val routes: Route = {

post{

decodeRequest{

entity(as[String]){ str =>

JsonParser.parse(str).validate[Event] match {

 case s: JsSuccess[String] => producer.send(new KeyedMessage(topic, str))

 case e: JsError => BadRequest -> JsError.toFlatJson(e).toString()

}

}

}

}

}

object AkkaHttpMicroservice extends App with Service {

Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port"))</pre>

}

數據消費:Spark Streaming

盡管Akka也能夠用于消耗來自Kafka的流數據,但將Spark納入生態系統以引入Spark Streaming能夠切實解決以下難題:

-其支持多種數據源

-提供“至少一次”語義

-可在配合Kafka Direct與冪等存儲實現“僅一次”語義

以下代碼示例闡述了如何利用Spark Streaming消費來自Kinesis的事件流:

val ssc = new StreamingContext(conf, Seconds(10))

val kinesisStream = KinesisUtils.createStream(ssc,appName,streamName,

endpointURL,regionName, InitialPositionInStream.LATEST,

Duration(checkpointInterval), StorageLevel.MEMORY_ONLY)

}

//transforming given stream to Event and saving to C*

kinesisStream.map(JsonUtils.byteArrayToEvent)

.saveToCassandra(keyspace, table)

ssc.start()

ssc.awaitTermination() </pre>

故障設計:備份與補丁安裝

通常來講,故障設計是任何系統當中最為枯燥的部分,但其重要性顯然不容質疑——當數據中心不可用或者需要對崩潰狀況加以分析時,盡可能保障數據免于丟失可謂至關重要。

那么為什么要將數據存儲在Kafka/Kinesis當中?截至目前,Kinesis仍然是惟一在無需備份的情況下能夠確保全部處理結果丟失后保留數據的解決方案。雖然Kafka也能夠支持數據長期保留,但硬件持有成本仍是個需要認真考慮的問題,因為S3存儲服務的使用成本要遠低于支持Kafka所需要的大量實例——另外,S3也提供非常理想的服務水平協議。

除了備份能力,恢復/補丁安裝策略還應當考慮到前期與測試需求,從而保證任何與數據相關的問題能夠得到迅速解決。程序員們在匯聚任務或者重復數據刪除操作中可能不慎破壞計算結果,因此修復這類錯誤的能力就變得非常關鍵。簡化這類操作任務的一種簡便方式在于在數據模型當中引入冪等機制,這樣同一操作的多次重復將產生相同的結果(例如SQL更新屬于冪等操作,而計數遞增則不屬于)。

以下示例為Spark任務讀取S3備份并將其載入至Cassandra:

val sc = new SparkContext(conf)

sc.textFile(s"s3n://bucket/2015//.gz")

.map(s => Try(JsonUtils.stringToEvent(s)))

.filter(.isSuccess).map(.get)

.saveToCassandra(config.keyspace, config.table)</pre>

宏觀構成

利用SMACK構建數據平臺頂層設計

縱觀全文,SMACK堆棧的卓越能力包括:

  • 簡明的工具儲備以解決范圍極廣的各類數據處理場景
  • 軟件方案久經考驗且擁有廣泛普及度,背后亦具備強大的技術社區
  • 易于實現規模伸縮與數據復制,且提供較低延遲水平
  • 統一化集群管理以實現異構負載
  • 可面向任意應用程序類型的單一平臺
  • 面向不同架構設計(批量、流數據、Lambda、Kappa)的實現平臺
  • 出色的產品發布速度(例如用于MVP驗證)
  • </ul> </div>

 本文由用戶 電子天府 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!