Apache Beam實戰指南之基礎入門
前言:大數據 2.0 時代不期而至
隨著大數據 2.0 時代悄然到來,大數據從簡單的批處理擴展到了實時處理、流處理、交互式查詢和機器學習應用。早期的處理模型 (Map/Reduce) 早已經力不從心,而且也很難應用到處理流程長且復雜的數據流水線上。另外,近年來涌現出諸多大數據應用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發者經常要用到不同的技術、框架、API、開發語言和 SDK 來應對復雜應用的開發。這大大增加了選擇合適工具和框架的難度,開發者想要將所有的大數據組件熟練運用幾乎是一項不可能完成的任務。
面對這種情況,Google 在 2016 年 2 月宣布將大數據流水線產品(Google DataFlow)貢獻給 Apache 基金會孵化,2017 年 1 月 Apache 對外宣布開源 Apache Beam,2017 年 5 月迎來了它的第一個穩定版本 2.0.0。在國內,大部分開發者對于 Beam 還缺乏了解,社區中文資料也比較少。InfoQ 期望通過 Apache Beam 實戰指南系列文章 推動 Apache Beam 在國內的普及。
本文將簡要介紹 Apache Beam 的發展歷史、應用場景、模型和運行流程、SDKs 和 Beam 的應用示例。歡迎加入 Beam 中文社區深入討論和交流。
概述
大數據處理領域的一大問題是:開發者經常要用到很多不同的技術、框架、API、開發語言和 SDK。取決于需要完成的是什么任務,以及在什么情況下進行,開發者很可能會用 MapReduce 進行批處理,用 Apache Spark SQL 進行交互請求(interactive queries),用 Apache Flink 進行實時流處理,還有可能用到基于云端的機器學習框架。
近兩年涌現的開源大潮,為大數據開發者提供了十分富余的工具。但這同時也增加了開發者選擇合適工具的難度,尤其對于新入行的開發者來說。這很可能拖慢、甚至阻礙開源工具的發展:把各種開源框架、工具、庫、平臺人工整合到一起所需工作之復雜,是大數據開發者常有的抱怨之一,也是他們支持專有大數據平臺的首要原因。
Apache Beam 發展歷史
Beam 在 2016 年 2 月成為 Apache 孵化器項目,并在 2016 年 12 月升級成為 Apache 基金會的頂級項目。通過十五個月的努力,一個稍顯混亂的代碼庫,從多個組織合并,已發展成為數據處理的通用引擎,集成多個處理數據框架,可以做到跨環境。
Beam 經過三個孵化器版本和三個后孵化器版本的演化和改進,最終在 2017 年 5 月 17 日迎來了它的第一個穩定版 2.0.0。發布穩定版本 3 個月以來,Apache Beam 已經出現明顯的增長,無論是通過官方還是社區的貢獻數量。Apache Beam 在谷歌云方面也已經展示出了“才干”。
Beam 2.0.0 改進了用戶體驗,重點在于框架跨環境的無縫移植能力,這些執行環境包括執行引擎、操作系統、本地集群、云端以及數據存儲系統。Beam 的其他特性還包括如下幾點:
-
API 穩定性和對未來版本的兼容性。
-
有狀態的數據處理模式,高效的支持依賴于數據的計算。
-
支持用戶擴展的文件系統,支持 Hadoop 分布式發文件系統及其他。
-
提供了一個度量指標系統,可用于跟蹤管道的執行狀況。
網上已經有很多人寫過 Beam 2.0.0 版本之前的資料,但是 2.0.0 版本后 API 很多寫法變動較大,本文將帶著大家從零基礎到 Apache Beam 入門。
Apache Beam 應用場景
Google Cloud、PayPal、Talend 等公司都在使用 Beam,國內包括阿里巴巴、百度、金山、蘇寧、九次方大數據、360、慧聚數通信息技術有限公司等也在使用 Beam,同時還有一些大數據公司的架構師或研發人員正在一起進行研究。Apache Beam 中文社區正在集成一些工作中的 runners 和 sdk IO,包括人工智能、機器學習和時序數據庫等一些功能。
以下為應用場景的幾個例子:
-
Beam 可以用于 ETL Job 任務
Beam 的數據可以通過 SDKs 的 IO 接入,通過管道可以用后面的 Runners 做清洗。
-
Beam 數據倉庫快速切換、跨倉庫
由于 Beam 的數據源是多樣 IO,所以用 Beam 可以快速切換任何數據倉庫。
-
Beam 計算處理平臺切換、跨平臺
Runners 目前提供了 3-4 種可以切換的平臺,隨著 Beam 的強大應該會有更多的平臺提供給大家使用。
Apache Beam 運行流程
4-1 數據處理流程
如圖 4-1 所示,Apache Beam 大體運行流程分成三大部分:
-
Modes
Modes 是 Beam 的模型或叫數據來源的 IO,它是由多種數據源或倉庫的 IO 組成,數據源支持批處理和流處理。
-
Pipeline
Pipeline 是 Beam 的管道,所有的批處理或流處理都要通過這個管道把數據傳輸到后端的計算平臺。這個管道現在是唯一的。數據源可以切換多種,計算平臺或處理平臺也支持多種。需要注意的是,管道只有一條,它的作用是連接數據和 Runtimes 平臺。
-
Runtimes
Runtimes 是大數據計算或處理平臺,目前支持 Apache Flink、Apache Spark、Direct Pipeline 和 Google Clound Dataflow 四種。其中 Apache Flink 和 Apache Spark 同時支持本地和云端。Direct Pipeline 僅支持本地,Google Clound Dataflow 僅支持云端。除此之外,后期 Beam 國外研發團隊還會集成其他大數據計算平臺。由于谷歌未進入中國,目前國內開發人員在工作中對谷歌云的使用應該不是很多,主要以前兩種為主。為了使讀者讀完文章后能快速學習且更貼近實際工作環境,后續文章中我會以前兩種作為大數據計算或處理平臺進行演示。
Beam Model 及其工作流程
Beam Model 指的是 Beam 的編程范式,即 Beam SDK 背后的設計思想。在介紹 Beam Model 之前,先簡要介紹一下 Beam Model 要處理的問題域與一些基本概念。
-
數據源類型。分布式數據來源類型一般可以分為兩類,有界的數據集和無界的數據流。有界的數據集,比如一個 Ceph 中的文件,一個 Mongodb 表等,特點是數據已經存在,數據集有已知的、固定的大小,一般存在磁盤上,不會突然消失。而無界的數據流,比如 Kafka 中流過來的數據流,這種數據的特點是數據動態流入、沒有邊界、無法全部持久化到磁盤上。Beam 框架設計時需要針對這兩種數據的處理進行考慮,即批處理和流處理。
-
時間。分布式框架的時間處理有兩種,一種是全量計算,另一種是部分增量計算。我給大家舉個例子:例如我們玩“王者農藥”游戲,游戲的數據需要實時地流向服務器,掉血情況會隨著時間實時變化,但是排行榜的數據則是全部玩家在一定時間內的排名,例如一周或一個月。Beam 針對這兩種情況都設計了對應的處理方式。
亂序。對于流處理框架處理的數據流來說,數據到達大體分兩種,一種是按照 Process Time 定義時間窗口,這種不用考慮亂序問題,因為都是關閉當前窗口后才進行下一個窗口操作,需要等待,所以執行都是有序的。而另一種,Event Time 定義的時間窗口則不需要等待,可能當前操作還沒有處理完,就直接執行下一個操作,造成消息順序處理但結果不是按順序排序了。例如我們的訂單消息,采用了分布式處理,如果下單操作所屬服務器處理速度比較慢,而用戶支付的服務器速度非常快,這時最后的訂單操作時間軸就會出現一種情況,下單在支付的后面。對于這種情況,如何確定遲到數據,以及對于遲到數據如何處理通常是很麻煩的事情。
Beam Model 處理的目標數據是無界的時間亂序數據流,不考慮時間順序或有界的數據集可看做是無界亂序數據流的一個特例。Beam Model 從下面四個維度歸納了用戶在進行數據處理的時候需要考慮的問題:
-
What。如何對數據進行計算?例如,機器學習中訓練學習模型可以用 Sum 或者 Join 等。在 Beam SDK 中由 Pipeline 中的操作符指定。
-
Where。數據在什么范圍中計算?例如,基于 Process-Time 的時間窗口、基于 Event-Time 的時間窗口、滑動窗口等等。在 Beam SDK 中由 Pipeline 的窗口指定。
-
When。何時輸出計算結果?例如,在 1 小時的 Event-Time 時間窗口中,每隔 1 分鐘將當前窗口計算結果輸出。在 Beam SDK 中由 Pipeline 的 Watermark 和觸發器指定。
-
How。遲到數據如何處理?例如,將遲到數據計算增量結果輸出,或是將遲到數據計算結果和窗口內數據計算結果合并成全量結果輸出。在 Beam SDK 中由 Accumulation 指定。
Beam Model 將“WWWH”四個維度抽象出來組成了 Beam SDK,用戶在基于 Beam SDK 構建數據處理業務邏輯時,每一步只需要根據業務需求按照這四個維度調用具體的 API,即可生成分布式數據處理 Pipeline,并提交到具體執行引擎上執行。“WWWH”四個維度只是從業務的角度看待問題,并不是全部適用于自己的業務。做技術架構一定要結合自己的業務使用相應的技術特性或框架。Beam 做為“一統”的框架,為開發者帶來了方便。
Beam SDKs
Beam SDK 給上層應用的開發者提供了一個統一的編程接口,開發者不需要了解底層的具體的大數據平臺的開發接口是什么,直接通過 Beam SDK 的接口就可以開發數據處理的加工流程,不管輸入是用于批處理的有界數據集,還是流式的無界數據集。對于這兩類輸入數據,Beam SDK 都使用相同的類來表現,并且使用相同的轉換操作進行處理。Beam SDK 擁有不同編程語言的實現,目前已經完整地提供了 Java 的 SDK,Python 的 SDK 還在開發中,相信未來會發布更多不同編程語言的 SDK。
Beam 2.0 的 SDKs 目前有:
Amqp:高級消息隊列協議。
Cassandra:Cassandra 是一個 NoSQL 列族(column family)實現,使用由 Amazon Dynamo 引入的架構方面的特性來支持 Big Table 數據模型。Cassandra 的一些優勢如下所示:
-
高度可擴展性和高度可用性,沒有單點故障
-
NoSQL 列族實現
-
非常高的寫入吞吐量和良好的讀取吞吐量
-
類似 SQL 的查詢語言(從 0.8 版本起),并通過二級索引支持搜索
-
可調節的一致性和對復制的支持靈活的模式
Elasticesarch:一個實時的分布式搜索引擎。
Google-cloud-platform:谷歌云 IO。
Hadoop-file-system:操作 Hadoop 文件系統的 IO。
Hadoop-hbase:操作 Hadoop 上的 Hbase 的接口 IO。
Hcatalog:Hcatalog 是 Apache 開源的對于表和底層數據管理統一服務平臺。
Jdbc:連接各種數據庫的數據庫連接器。
Jms:Java 消息服務(Java Message Service,簡稱 JMS)是用于訪問企業消息系統的開發商中立的 API。企業消息系統可以協助應用軟件通過網絡進行消息交互。JMS 在其中扮演的角色與 JDBC 很相似,正如 JDBC 提供了一套用于訪問各種不同關系數據庫的公共 API,JMS 也提供了獨立于特定廠商的企業消息系統訪問方式。
Kafka:處理流數據的輕量級大數據消息系統,或叫消息總線。
Kinesis:對接亞馬遜的服務,可以構建用于處理或分析流數據的自定義應用程序,以滿足特定需求。
Mongodb:MongoDB 是一個基于分布式文件存儲的數據庫。
Mqtt:IBM 開發的一個即時通訊協議。
Solr:亞實時的分布式搜索引擎技術。
xml:一種數據格式。
Beam Pipeline Runners
Beam Pipeline Runner 將用戶用 Beam 模型定義開發的處理流程翻譯成底層的分布式數據處理平臺支持的運行時環境。在運行 Beam 程序時,需要指明底層的正確 Runner 類型,針對不同的大數據平臺,會有不同的 Runner。目前 Flink、Spark、Apex 以及谷歌的 Cloud DataFlow 都有支持 Beam 的 Runner。
需要注意的是,雖然 Apache Beam 社區非常希望所有的 Beam 執行引擎都能夠支持 Beam SDK 定義的功能全集,但是在實際實現中可能無法達到這一期望。例如,基于 MapReduce 的 Runner 顯然很難實現和流處理相關的功能特性。就目前狀態而言,對 Beam 模型支持最好的就是運行于谷歌云平臺之上的 Cloud Dataflow,以及可以用于自建或部署在非谷歌云之上的 Apache Flink。當然,其它的 Runner 也正在迎頭趕上,整個行業也在朝著支持 Beam 模型的方向發展。
Beam 2.0 的 Runners 框架如下:
Apex
誕生于 2015 年 6 月的 Apache Apex,其同樣源自 DataTorrent 及其令人印象深刻的 RTS 平臺,其中包含一套核心處理引擎、儀表板、診斷與監控工具套件外加專門面向數據科學家用戶的圖形流編程系統 dtAssemble。主要用于流處理,常用于物聯網等場景。
Direct-java
本地處理和運行 runner。
Flink_2.10
Flink 是一個針對流數據和批數據的分布式處理引擎。
Gearpump
Gearpump 是一個基于 Akka Actor 的輕量級的實時流計算引擎。如今流平臺需要處理來自各種移動端和物聯網設備的海量數據,系統要能不間斷地提供服務,對數據的處理要能做到不丟失不重復,對各種軟硬件錯誤能平滑處理,對用戶的輸入要能實時響應。除了這些系統層面的需求外,用戶層面的接口還要能做到豐富而靈活,一方面,平臺要提供足夠豐富的基礎設施,能最簡化應用程序的編寫;另一方面,這個平臺應提供具有表現力的編程 API,讓用戶能靈活表達各種計算,并且整個系統可以定制,允許用戶選擇調度策略和部署環境,允許用戶在不同的指標間做折中取舍,以滿足特定的需求。Akka Actor 提供了通信、并發、隔離、容錯的基礎設施,Gearpump 通過把抽象層次提升到 Actor 這一層,屏蔽了底層的細節,專注于流處理需求本身,能更簡單而又高效地解決上述問題。
Dataflow
2016 年 2 月份,谷歌及其合作伙伴向 Apache 捐贈了一大批代碼,創立了孵化中的 Beam 項目(最初叫 Apache Dataflow)。這些代碼中的大部分來自于谷歌 Cloud Dataflow SDK——開發者用來寫流處理和批處理管道(pipelines)的庫,可在任何支持的執行引擎上運行。當時,支持的主要引擎是谷歌 Cloud Dataflow。
Spark
Apache Spark 是一個正在快速成長的開源集群計算系統。Apache Spark 生態系統中的包和框架日益豐富,使得 Spark 能夠執行高級數據分析。Apache Spark 的快速成功得益于它的強大功能和易用性。相比于傳統的 MapReduce 大數據分析,Spark 效率更高、運行時速度更快。Apache Spark 提供了內存中的分布式計算能力,具有 Java、Scala、Python、R 四種編程語言的 API 編程接口。
實戰:開發第一個 Beam 程序
8.1 開發環境
-
下載安裝 JDK 7 或更新的版本,檢測 JAVA_HOME 環境變量。本文示例使用的是 JDK 1.8。
-
下載 maven 并配置,本文示例使用的是 maven-3.3.3。
-
開發環境 myeclipse、Spring Tool Suite 、IntelliJ IDEA,這個可以按照個人喜好,本文示例用的是 STS。
8.2 開發第一個 wordCount 程序并且運行
1 新建一個 maven 項目
2 在 pom.xml 文件中添加兩個 jar 包
3 新建一個 txtIOTest.java
4 因為 Windows 上的 Beam2.0.0 不支持本地路徑,需要部署到 Linux 上,需要打包如圖,此處注意要把依賴 jar 都打包進去。
5 部署 beam.jar 到 Linux 環境中
使用 Xshell 5 登錄虛擬機或者 Linux 系統。用 rz 命令把剛才打包的文件上傳上去。其中虛擬機要安裝上 jdk 并配置好環境變量。
我們可以用輸入 javac 命令測試一下。
我們把 beam.jar 上傳到 /usr/local/ 目錄下面,然后新建一個文件,也就是源文件。命令:touch text.txt 命令:chmod o+rwx text.txt
修改 text.txt 并添加數據。 命令:vi text.txt
運行命令:java -jar beam.jar,生成文件。
用 cat 命令查看文件內容,里面就是統計的結果。
8.3 實戰剖析
我們可以通過以上實戰代碼進一步了解 Beam 的運用原理。
第一件事情是搭建一個管道(Pipeline),例如我們小時候家里澆地用的“水管”。它就是連接水源和處理的橋梁。
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();// 創建管道
第二件事情是讓我們的管道有一個處理框架,也就是我們的 Runtimes 。例如我們接到水要怎么處理,是輸送給我們城市的污水處理廠,還是其他。這個污水處理廠就相當于我們的處理框架,例如現在流行的 Apache Spark 或 Apache Flink。這個要根據自己的業務指定,如下代碼中我指定了本地的處理框架。
pipelineOptions.setRunner(DirectRunner.class);
第三件事情也是 Beam 最后一個重要的地方,就是模型 (Model),通俗點講就是我們的數據來源。如果結合以上第一件和第二件的事情說就是水從哪里來,水的來源可能是河里、可能是污水通道等等。本實例用的是有界固定大小的文本文件。當然 Model 還包含無界數據,例如 kafka 等等,可以根據的需求靈活運用。
pipeline.apply(TextIO.read().from("/usr/local/text.txt")).apply ("ExtractWords", ParDo.of(new DoFn<String, String>() //后省略
最后一步是處理結果,這個比較簡單,可以根據自己的需求處理。希望通過代碼的實戰結合原理剖析可以幫助大家更快地熟悉 Beam 并能夠簡單地運用 Beam。
總結
Apache Beam 是集成了很多數據模型的一個統一化平臺,它為大數據開發工程師頻繁換數據源或多數據源、多計算框架提供了集成統一框架平臺。Apache Beam 社區現在已經集成了數據庫的切換 IO,未來 Beam 中文社區還將為 Beam 集成更多的 Model 和計算框架,為大家提供方便。
作者介紹:張海濤,目前就職于海康威視云基礎平臺,負責云計算大數據的基礎架構設計和中間件的開發,專注云計算大數據方向。Apache Beam 中文社區發起人之一,如果想進一步了解最新 Apache Beam 動態和技術研究成果,請加微信 cyrjkj 入群共同研究和運用。
感謝蔡芳芳對本文的審校。
來自:http://www.infoq.com/cn/articles/apache-beam-in-practice