用Apache Spark進行大數據處理——第三部分:Spark流
來自: http://www.infoq.com/cn/articles/apache-spark-streaming
介紹
在“用Apache Spark進行大數據處理”系列的前兩篇文章中,我們看到了Apache Spark框架是什么(第一部分)還有如何使用Spark SQL庫訪問數據的SQL接口(第二部分)。
這些方案是基于批處理模式下靜態信息處理的,比如作為一個按小時或天運行的任務。但若是在數據驅動的業務決策場景下,當需要飛快地分析實時數據流以執行分析并創建決策支持時,又該如何呢?
使用流式數據處理,一旦數據到達計算就會被實時完成,而非作為批處理任務。實時數據處理與分析正在變為大多數組織的大數據戰略中至關重要的一個組件。 在本文中,我們將會學習到如何使用Apache Spark中一個被稱為 Spark流 的庫進行實時數據分析。
我們將會看到一個網絡服務器日志分析用例,該用例會向我們展示Spark流是如何幫助我們對持續產生的數據流進行分析的。
流數據分析
流數據基本上是一組連續的數據記錄,它們通常產生于諸如傳感器、服務器流量與在線搜索等數據源。常見的流數據的例子有網站上的用戶行為、監控數據、服務器日志與其他事件數據。
流數據處理應用會有助于現場面板、實時在線推薦與即時詐騙檢測。
如果我們正在構建一個實時收集、處理與分析流數據的應用,我們需要按照與批處理數據應用不同的設計視角進行考慮。
下面列出了三種不同的流數據處理框架:
在本文中我們將專注于Spark流。
Spark流
Spark流是核心Spark API的擴展。Spark流使得基于實時數據流構建容錯性處理變得更加簡單。下面的圖1展示了Spark流是如何融入到整個Apache Spark生態系統中。
(點擊放大圖像)
圖1.具有Spark流庫的Spark生態系統
Spark流工作的方式是將數據流按照預先定義的間隔(N秒)劃分為批(稱微批次)然后將每批數據視為一個 彈性分布式數據集 (Resilient Distributed Datasets,RDDs)。隨后我們就可以使用諸如map、reduce、reduceByKey、join和window這樣的操作來處理這些RDDs。這些RDD操作的結果會以批的形式返回。通常我們會將這些結果保存到數據存儲中以供未來分析并生成報表與面板,或是發送基于事件的預警。
為Spark流決定時間間隔是很重要的,這需要基于你的用例與數據處理要求。如果值N太低,那么在分析階段微批次就沒有足夠的數據以給出有意義的結果。
與Spark流相比,其他流處理框架是基于每個事件而非一個微批次來處理數據流的。用微批次的方法,我們可以在同一應用下使用Spark流API來應用其他Spark庫(比如核心、機器學習等)。
流數據可以來源于許多不同的數據源。下面列出一些這樣的數據源:
使用諸如Apache Spark這種大數據處理框架的另外一個優勢就是我們可以在同一系統中組合批處理與流處理。我們也可以在數據流上應用Spark的機器學習與圖處理算法。在本系列的后續文章當中,我們將會討論被稱為 MLlib 和 GraphX 的機器學習與圖處理庫。
Spark流結構如下圖2所示。
(點擊放大圖像)
圖2.Spark流如何工作
Spark流用例
Spark流正在變為實現實時數據處理與分析方案的首選平臺,這些實時數據往往來源于物聯網(Internet of Things,IoT)和傳感器。它被用于各種用例與商業應用。
下面是一些最有趣的 Spark流用例 :
- Uber ,車駕共享服務背后的公司,在他們的持續流式ETL管道中使用了Spark流以每天從其移動用戶處收集TB級的事件數據來進行實時遙測分析。
- Pinterest ,可視化書簽工具背后的公司,使用Spark流、MemSQL與Apache Kafka技術以實時地深入了解他們全球的用戶是怎樣使用Pins的。
- Netflix 使用Kafka與Spark流來構建一個實時在線電影推薦與數據監控解決 方案 ,該方案每天要處理來自于不同數據源的數十億條事件。
Spark流其他現實世界的樣例還包括:
- 供應鏈分析
- 實時安全情報操作以尋找威脅
- 廣告競價平臺
- 實時視頻分析,以幫助觀看者實現個性化與互動體驗
讓我們看一下Spark流的架構與API方法。若要編寫Spark流程序,我們需要知曉兩個組件:DStream與流上下文。
DStream
Dstream (離散流,Discretized Stream,的縮寫)是Spark流中最基本的抽象,它描述了一個持續的數據流。DStream既可以從諸如Kafka、Flume與Kinesis這樣的數據源中創建,也可以對其他DStream實施操作。在內部,一個DStream被描述為一個RDD對象的序列。
與RDDs上的轉換與動作操作類似,DStream支持以下 操作 :
- map
- flatMap
- filter
- count
- reduce
- countByValue
- reduceByKey
- join
- updateStateByKey
流上下文
與Spark中的 Spark上下文(SparkContext) 相似, 流上下文(StreamingContext) 是所有流功能的主入口。
流上下文擁有內置方法可以將流數據接收到Spark流程序中。
使用該上下文,我們可以創建一個描述基于TCP數據源的流數據的DStream,可以用主機名與端口號指定TCP數據源。比如,如果我們使用像netcat這樣的工具來測試Spark流程序的話,我們將會從運行netcat的機器(比如localhost)的9999端口上接收到數據流。
當代碼被執行,在啟動時,Spark流僅是設置將要執行的計算,此時還沒有進行實時處理。在所有的轉換都被設置完畢后,為了啟動處理,我們最終會調用start()方法來啟動計算,還有awaitTermination()方法來等待計算終結。
Spark流API
Spark流附帶了若干個用于處理數據流的API方法。有類似于RDD的操作,比如map、flatMap、filter、count、reduce、groupByKey、reduceByKey、sortByKey和join。它也提供了其他基于window與stateful操作的處理流數據的API。包括window、countByWindow、reduceByWindow、countByValueAndWindow、reduceByKeyAndWindow和updateStateByKey。Spark流庫當前支持Scala、Java和Python編程語言。這里是每個語言對應的Spark流API鏈接:
Spark編程的步驟
在我們討論樣例應用之前,先來看看Spark流編程中與眾不同的步驟:
- Spark流上下文被用于處理實時數據流。因此,第一步就是用兩個參數初始化流上下文對象,Spark上下文和切片間隔時間。切片間隔設置了流中我們處理輸入數據的更新窗口。一旦上下文被初始化,就無法再向已經存在的上下文中定義或添加新的計算。并且,在同一時間只有一個流上下文對象可以被激活。
- 當Spark流上下文被定義后,我們通過創建輸入DStreams來指定輸入數據源。在我們的樣例應用中,輸入數據源是一個使用了Apache Kafka分布式數據庫和消息系統的日志消息生成器。日志生成器程序創建隨機日志消息以模擬網絡服務器的運行時環境,作為各種網絡應用服務用戶而產生的流量,日志消息被持續不斷地生成。
- 使用map和reduce這樣的Spark流變換API為DStreams定義計算。
- 當流計算邏輯被定義好后,我們可以使用先前創建的流上下文對象中的start方法來開始接收并處理數據。
- 最終,我們使用流上下文對象的awaitTermination方法等待流數據處理完畢并停止它。
樣例應用
在本文中我們討論的樣例應用是一個服務器日志處理與分析程序。它可以被用于對服務器日志進行實時監控并執行基于這些日志的數據分析。這些日志消息被認為是 時序數據 ,也就是由在一個指定時間間隔內所捕捉到的連續度量的數據點組成的序列。
時序數據的例子包括傳感器數據、天氣信息和點擊流數據。時序分析就是處理時序數據以提取有助于制定業務決策的信息。該數據也可以被用于基于歷史數據的預測分析。
使用這樣的方案,我們不需要每小時或每天的批處理任務來處理服務器日志。Spark流接收持續產生的數據,對其進行處理并計算日志統計,以此來挖掘數據。
為了遵循服務器日志分析的標準樣例,我們將會使用在Data Bricks Spark流 參考應用 中所討論的Apache日志分析器作為我們樣例應用的參考。該應用已經具備將在我們的應用中被重用的日志消息解析代碼。這個參考應用是一個用來學習Spark通用框架以及Spark流的優秀資源。
點擊他們的 網站 ,以查看更多關于Databricks Spark參考應用的細節。
用例
樣例應用的用例是一個網絡服務器日志分析與統計的生成器。在樣例應用中,我們分析網絡服務器日志以計算如下統計信息,這些信息有助于進一步的數據分析和報表及面板的創建:
- 不同HTTP響應代碼的響應計數
- 響應內容大小
- 導致最高網絡流量的訪問客戶端的IP地址
- 最熱門的終端URL以識別那些比其他服務被訪問的更多服務
與本系列的前兩篇文章不同,在本文中我們將使用Java而非Scala來創建Spark程序。我們按照獨立應用的方式運行程序,而不是在控制臺窗口中運行代碼。在測試與產品環境中部署Spark程序也如此。Shell控制臺接口(使用Scala、Python或R語言)僅僅是用于開發者本地測試而已。
技術
在樣例程序中我們將使用如下的技術來演示如何使用Spark流庫處理實時數據流。
Zookeeper
Zookeeper 是一個為分布式應用提供可靠分布式協調的集中化的服務。Kafka,我們在樣例應用中使用的消息系統,依賴于Zoopkeeper在整個集群中的詳細設置。
Kafka
Apache Kafka 是一個實時的、容錯的、可擴展的消息系統,它用于實時地移動數據。對于諸如捕捉網站上用戶活動、日志、股票行情數據以及儀表數據這些用例來說,它是一個很好的選擇。
Kafka的工作方式類似于分布式數據庫,它是基于被分區和復制的低延遲提交日志的。當我們將一個消息發送給Kafka,在集群中它會被復制給不同的服務器,與此同時它也會被提交到磁盤。
Apache Kafka包含客戶端API以及一個稱為Kafka連接的數據轉換器框架。
Kafka客戶端:Kafka包括Java客戶端(針對消息生產者與消費者)。在我們的樣例應用中我們將會使用Java生產者客戶端API。
Kafka連接:Kafka也包含了 Kafka連接 ,即一個介于Apache Kafka與外部數據系統之間的流數據框架,它可以支持組織內的數據管道。它包含了導入與導出連接器以將數據集移入或移出Kafka。Kafka連接程序可以作為獨立進程或分布式服務運行,它支持REST接口的方式,即使用REST API提交連接器到Kafka連接集群。
Spark流
我們將會使用Spark流Java API來接收數據流,計算日志統計信息并且運行查詢以回答諸如“最多網絡請求來自于哪個IP地址”這樣的問題。下面的表1展示了樣例應用中所使用的技術與工具以及他們的版本。
技術 |
版本 |
URL |
Zookeeper |
3.4.6 |
https://zookeeper.apache.org/doc/r3.4.6/ |
Kafka |
2.10 |
http://kafka.apache.org/downloads.html |
Spark 流 |
1.4.1 |
https://spark.apache.org/releases/spark-release-1-4-1.html |
JDK |
1.7 |
http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html |
Maven |
3.3.3 |
http://archive.apache.org/dist/maven/maven-3/3.3.3/ |
表1.Spark流樣例應用技術及工具
在圖3中演示了Spark流樣例應用中不同架構組件。
(點擊放大圖像)
圖3.Spark流樣例應用架構
Spark流應用運行時
為了在本地設置Java項目,可以從Github上下載 Databricks參考應用代碼 。一旦獲取了參考應用代碼,就需要兩個額外的Java類來運行我們的樣例應用。
- 日志生成器(SparkStreamingKafkaLogGenerator.java)
- 日志分析器(SparkStreamingKafkaLogAnalyzer.java)
在文章網站上提供了這些文件的zip壓縮包( spark-streaming-kafka-sample-app.zip )。如果你想在你本地機器上運行樣例應用,使用鏈接下載zip文件,抽出Java類并將他們添加到之前步驟中創建的Java項目中。
樣例應用可以被執行在不同的操作系統上。我在Windows和Linux(CentOS VM)環境下都運行了應用。
讓我們看一下應用架構中的每個組件還有執行Spark流程序的步驟。
Zookeeper命令:
在樣例程序中我使用的Zookeeper版本是3.4.6。為了啟動服務器,需要設置兩個環境變量,JAVA_HOME與ZOOKEEPER_HOME來指定JDK和Zookeeper各自的安裝目錄。然后導航到Zookeeper的home目錄并運行如下命令來啟動Zookeeper服務器。
bin\zkServer.cmd
如果你使用的是Linux環境,命令就是:
bin/zkServer.sh start
Kafka服務器命令:
在程序中使用的Kafka版本是2.10-0.9.00,基于Scala2.10版本。在Kafka中所使用的Scala版本是非常重要的,因為若是沒有使用恰當的版本的話,當執行Spark流程序時就會遇到運行時錯誤。這里是啟動Kafka服務器實例的步驟:
- 打開一個新的命令行窗口
- 設置JAVA_HONE與KAFKA_HOME環境變量
- 導航到Kafka的home目錄
- 運行如下命令
bin\windows\kafka-server-start.bat config\server.properties
對于Linux環境,命令如下:bin/kafka-server-start.sh config/server.properties
日志生成器命令:
在我們的樣例應用中下一步就是運行消息日志生成器。
日志生成器以不同的HTTP響應碼(諸如200、401和404)及不同的終端URL創建測試日志消息。
在我們運行日志生成器之前,我們需要創建一個主題(Topic),我們可以將消息寫到里面去。
與之前的步驟類似,打開一個新的命令行窗口,設置JAVA_HOME和KAFKA_HOME環境變量,并且導航到Kafka的home目錄。然后首先運行以下命令來查看在Kafka服務器中已經存在的可用主題。
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --list
在Linux上:
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --list
我們將會用以下命令創建一個叫做“spark-streaming-sample-topic”的新主題:
bin\windows\kafka-run-class.bat kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic
在Linux上:
bin/kafka-run-class.sh kafka.admin.TopicCommand --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic spark-streaming-sample-topic
你可以再次運行list主題命令以查看是否新主題已經被正確創建。當主題已經被創建好后,我們就可以運行日志生成器程序了。通過調用稱為SparkStreamingKafkaLogGenerator的Java類來完成此步驟。日志生成器類接收以下四個參數來指定配置參數:
- 組標識:spark-streaming-sample-group
- 主題:spark-streaming-sample-topic
- 迭代次數:50
- 間隔:1000
打開一個新的命令行窗口來運行日志生成器。我們將要為JDK、Maven和Kafka目錄分別設置三個環境變量(JAVA_HOME、MAVEN_HOME和KAFKA_HOME)。然后導航到樣例項目根目錄(比如c:\dev\projects\spark-streaming-kafka-sample-app)并運行以下命令。
mvn exec:java -Dexec.mainClass=com.sparkstreaming.kafka.example.SparkStreamingKafkaLogGenerator -Dexec.args="spark-streaming-sample-groupid spark-streaming-sample-topic 50 1000"
一旦日志生成器程序運行起來,就應該在控制臺上通過debug消息看到被創建的測試日志消息。這只是個樣例代碼,所以日志消息被隨機地生成以模擬從諸如網絡服務器這種事件源生成的持續不斷的數據流。下面的圖4展示了日志消息生產者還有正在生成的日志消息截屏。
(點擊放大圖像)
圖4.Spark流日志生成器程序輸出
Spark流命令:
這是使用了Spark流API的日志消息消費者。我們使用叫做SparkStreamingKafkaLogAnalyzer的Java類來從Kafka服務器上接收并處理數據流以創建日志統計信息。
Spark流處理服務器日志消息并生成累計日志統計信息,比如網絡請求大小(最小、最大與平均)、響應代碼計數、IP地址與熱點終端。
我們用“local[*]”創建Spark上下文,它會在本地系統中檢測內核的數量并使用它們運行程序。
為了運行Spark流Java類,將會在classpath中用到以下JAR文件:
- kafka_2.10-0.9.0.0.jar
- kafka-clients-0.9.0.0.jar
- metrics-core-2.2.0.jar
- spark-streaming-kafka_2.10-1.4.0.jar
- zkclient-0.3.jar
將上述JAR文件添加到classpath后我用Eclipse IDE運行了程序。日志分析Spark流程序的輸出如圖5。
(點擊放大圖像)
圖5.Spark流日志分析程序輸出
Spark流應用的可視化
當Spark流程序運行的時候,我們可以檢查Spark控制臺來查看Spark任務的細節。
打開一個新的網絡瀏覽器窗口并導航到URL http://localhost:4040 以訪問Spark控制臺。
先看看一些展示Spark流程序統計信息的圖表。
第一個可視化就是任務的DAG(無回路有向圖,Direct Acyclic Grapg),它展示了我們所運行的程序中不同操作的依賴圖,操作有map、window和foreachRDD等。下面的圖6展示了我們樣例程序中Spark流任務的可視化截屏。
(點擊放大圖像)
圖6.Spark流任務的可視化圖形
我們將要看的下一個圖形就是包含了輸入比率的流統計圖,它顯示了每秒的事件數量,以及處理所花費的毫秒數。圖7展示了Spark流程序執行期間的這些統計信息,左面是流數據還沒有產生時的情況,而右邊是數據流被發送到Kafka并且被Spark流消費者處理的情況。
圖7.為樣例程序展示流統計信息的Spark可視化
結論
Spark流庫 ,Apache Spark生態系統中的一部分,用于實時流數據的數據處理。在本文中,我們學習了如何使用Spark流API來處理由服務器日志生成的數據并基于實時數據流執行分析。
下一步是什么
機器學習、預測分析和數據科學在近期都在獲得越來越多的關注,他們都是不同用例下的問題解決方案。 Spark MLlib ,Spark機器學習庫,提供了若干內置方法以使用諸如協同過濾、聚簇與歸類這樣的不同機器學習算法。
在下一篇文章中,我們將會探索Spark MLlib并觀察幾個用例來演示如何利用Spark的數據科學計算能力,它可以使機器學習算法的使用變得更加簡單。
參考
- 用Apache Spark進行大數據處理 -第一部分:介紹
- 用Apache Spark進行大數據處理 -第二部分:Spark SQL
- Apache Spark 主站點
- Spark流站點
- Spark流 編程指南
- Spark流 - Scala代碼示例
- Spark流 - Java代碼示例
- Data Bricks的Apache Spark 參考應用
- 使用Spark流實時地標記并處理數據 - Spark首腦2015大會 介紹
- MapR中Spark流 快速指南
關于作者
Srini Penchikala 目前是一家金融服務機構的軟件架構師,這個機構位于德克薩斯州的奧斯汀。他在軟件系統架構、設計和開發方面有超過20年的經驗。Srini目前正在撰寫一本關于NoSQL數據庫模式的書。他還是曼寧出版社出版的《 Spring Roo in Action 》一書的合著者。他還曾經出席各種會議,如JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini還在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等網站上發表過很多關于軟件系統架構、安全和風險管理以及NoSQL數據庫等方面的文章。他還是InfoQ NoSQL數據庫社區的責任編輯( http://www.infoq.com/author/Srini-Penchikala)。
查看英文原文: Big Data Processing with Apache Spark - Part 3: Spark Streaming
感謝百占輝對本文的審校。
給InfoQ中文站投稿或者參與內容翻譯工作,請郵件至editors@cn.infoq.com。也歡迎大家通過新浪微博(@InfoQ,@丁曉昀),微信(微信號: InfoQChina )關注我們。