大眾點評的大數據實踐
2011 年小規模試水
這一階段的主要工作是建立了一個小的集群,并導入了少量用戶進行測試。為了滿足用戶的需求,我們還調研了任務調度系統和數據交換系統。
我們使用的版本是當時最新的穩定版,Hadoop 0.20.203 和 Hive 0.7.1。此后經歷過多次升級與 Bugfix。現在使用的是 Hadoop 1.0.3+ 自有 Patch 與 Hive 0.9+ 自有 Patch。考慮到人手不足及自己的 Patch 不多等問題,我們采取的策略是,以 Apache 的穩定版本為基礎,盡量將自己的修改提交到社區,并且應用這些還沒有被接受的 Patch。因為現在 Hadoop 生態圈中還沒有出現一個類似 Red Hat 地位的公司,我們也不希望被鎖定在某個特定的發行版上,更重要的是 Apache Jira 與 Maillist 依然是獲取 Hadoop 相關知識、解決 Hadoop 相關問題最好的地方(Cloudera 為 CDH 建立了私有的 Jira,但人氣不足),所以沒有采用 Cloudera 或者 Hortonworks 的發行版。目前我們正對 Hadoop 2.1.0 進行測試。
在前期,我們團隊的主要工作是 ops+solution,現在 DBA 已接手了很大一部分 ops 的工作,我們正在轉向 solution+dev 的工作。
我們使用 Puppet 管理整個集群,用 Ganglia 和 Zabbix 做監控與報警。
集群搭建好,用戶便開始使用,面臨的第一個問題是需要任務級別的調度、報警和工作流服務。當用戶的任務出現異常或其他情況時,需要以郵件或者短 信的方式通知用戶。而且用戶的任務間可能有復雜的依賴關系,需要工作流系統來描述任務間的依賴關系。我們首先將目光投向開源項目 Apache Oozie。Oozie 是 Apache 開發的工作流引擎,以 XML 的方式描述任務及任務間的依賴,功能強大。但在測試后,發現 Oozie 并不是一個很好的選擇。
Oozie 采用 XML 作為任務的配置,特別是對于 MapReduce Job,需要在 XML 里配置 Map、Reduce 類、輸入輸出路徑、Distributed Cache 和各種參數。在運行時,先由 Oozie 提交一個 Map only 的 Job,在這個 Job 的 Map 里,再拼裝用戶的 Job,通過 JobClient 提交給 JobTracker。相對于 Java 編寫的 Job Runner,這種 XML 的方式缺乏靈活性,而且難以調試和維護。先提交一個 Job,再由這個 Job 提交真正 Job 的設計,我個人認為相當不優雅。
另一個問題在于,公司內的很多用戶,希望調度系統不僅可以調度 Hadoop 任務,也可以調度單機任務,甚至 Spring 容器里的任務,而 Oozie 并不支持 Hadoop 集群之外的任務。
所以我們轉而自行開發調度系統 Taurus(https://github.com/dianping/taurus)。Taurus 是一個調度系統, 通過時間依賴與任務依賴,觸發任務的執行,并通過任務間的依賴管理將任務組織成工作流;支持 Hadoop/Hive Job、Spring 容器里的任務及一般性任務的調度/監控。
圖 1 Taurus 的結構圖
圖 1 是 Taurus 的結構圖,Taurus 的主節點稱為 Master,Web 界面與 Master 在一起。用戶在 Web 界面上創建任務后,寫入 MySQL 做持久化存儲,當 Master 判斷任務觸發的條件滿足時,則從 MySQL 中讀出任務信息,寫入 ZooKeeper;Agent 部署在用戶的機器上,觀察 ZooKeeper 上的變化,獲得任務信息,啟動任務。Taurus 在 2012 年中上線。
另一個迫切需求是數據交換系統。用戶需要將 MySQL、MongoDB 甚至文件中的數據導入到 HDFS 上進行分析。另外一些用戶要將 HDFS 中生成的數據再導入 MySQL 作為報表展現或者供在線系統使用。
我們首先調研了 Apache Sqoop,它主要用于 HDFS 與關系型數據庫間的數據傳輸。經過測試,發現 Sqoop 的主要問題在于數據的一致性。Sqoop 采用 MapReduce Job 進行數據庫的插入,而 Hadoop 自帶 Task 的重試機制,當一個 Task 失敗,會自動重啟這個 Task。這是一個很好的特性,大大提高了 Hadoop 的容錯能力,但對于數據庫插入操作,卻帶來了麻煩。
考慮有 10 個 Map,每個 Map 插入十分之一的數據,如果有一個 Map 插入到一半時 failed,再通過 Task rerun 執行成功,那么 fail 那次插入的一半數據就重復了,這在很多應用場景下是不可接受的。而且 Sqoop 不支持 MongoDB 和 MySQL 之間的數據交換,但公司內卻有這需求。最終我們參考淘寶的 DataX,于 2011 年底開始設計并開發了 Wormhole。之所以采用自行開發而沒有直接使用 DataX 主要出于維護上的考慮,而且 DataX 并未形成良好的社區。
2012 年大規模應用
2012 年,出于成本、穩定性與源碼級別維護性的考慮,公司的 Data Warehouse 系統由商業的 OLAP 數據庫轉向 Hadoop/Hive。2012 年初,Wormhole 開發完成;之后 Taurus 也上線部署;大量應用接入到 Hadoop 平臺上。為了保證數據的安全性,我們開啟了 Hadoop 的 Security 特性。為了提高數據的壓縮率,我們將默認存儲格式替換為 RCFile,并開發了 Hive Web 供公司內部使用。2012 年底,我們開始調研 HBase。
圖 2 Wormhole 的結構圖
Wormhole(https://github.com /dianping/wormhole) 是一個結構化數據傳輸工具,用于解決多種異構數據源間的數據交換,具有高效、易擴展等特點,由 Reader、Storage、Writer 三部分組成(如圖 2 所示)。Reader 是個線程池,可以啟動多個 Reader 線程從數據源讀出數據,寫入 Storage。Writer 也是線程池,多線程的 Writer 不僅用于提高吞吐量,還用于寫入多個目的地。Storage 是個雙緩沖隊列,如果使用一讀多寫,則每個目的地都擁有自己的 Storage。
當寫入過程出錯時,將自動執行用戶配置的 Rollback 方法,消除錯誤狀態,從而保證數據的完整性。通過開發不同的 Reader 和 Writer 插件,如 MySQL、MongoDB、Hive、HDFS、SFTP 和 Salesforce,我們就可以支持多種數據源間的數據交換。Wormhole 在大眾點評內部得到了大量使用,獲得了廣泛好評。
隨著越來越多的部門接入 Hadoop,特別是數據倉庫(DW)部門接入后,我們對數據的安全性需求變得更為迫切。而 Hadoop 默認采用 Simple 的用戶認證模式,具有很大的安全風險。
默認的 Simple 認證模式,會在 Hadoop 的客戶端執行 whoami 命令,并以 whoami 命令的形式返回結果,作為訪問 Hadoop 的用戶名(準確地說,是以 whoami 的形式返回結果,作為 Hadoop RPC 的 userGroupInformation 參數發起 RPC Call)。這樣會產生以下三個問題。
(1)User Authentication。假設有賬號A和賬號B,分別在 Host1 和 Host2 上。如果惡意用戶在 Host2 上建立了一個同名的賬號A,那么通過 RPC Call 獲得的 UGI 就和真正的賬號A相同,偽造了賬號A的身份。用這種方式,惡意用戶可以訪問/修改其他用戶的數據。
(2)Service Authentication。Hadoop 采用主從結構,如 NameNode-DataNode、JobTracker-Tasktracker。Slave 節點啟動時,主動連接 Master 節點。Slave 到 Master 的連接過程,沒有經過認證。假設某個用戶在某臺非 Hadoop 機器上,錯誤地啟動了一個 Slave 實例,那么也會連接到 Master;Master 會為它分配任務/數據,可能會影響任務的執行。
(3)可管理性。任何可以連到 Master 節點的機器,都可以請求集群的服務,訪問 HDFS,運行 Hadoop Job,無法對用戶的訪問進行控制。
從 Hadoop 0.20.203 開始,社區開發了 Hadoop Security,實現了基于 Kerberos 的 Authentication。任何訪問 Hadoop 的用戶,都必須持有 KDC(Key Distribution Center)發布的 Ticket 或者 Keytab File(準確地說,是 Ticket Granting Ticket),才能調用 Hadoop 的服務。用戶通過密碼,獲取 Ticket,Hadoop Client 在發起 RPC Call 時讀取 Ticket 的內容,使用其中的 Principal 字段,作為 RPC Call 的 UserGroupInformation 參數,解決了問題(1)。Hadoop 的任何 Daemon 進程在啟動時,都需要使用 Keytab File 做 Authentication。因為 Keytab File 的分發是由管理員控制的,所以解決了問題(2)。最后,不論是 Ticket,還是 Keytab File,都由 KDC 管理/生成,而 KDC 由管理員控制,解決了問題(3)。
在使用了 Hadoop Security 之后,只有通過了身份認證的用戶才能訪問 Hadoop,大大增強了數據的安全性和集群的可管理性。之后我們基于 Hadoop Secuirty,與 DW 部門一起開發了 ACL 系統,用戶可以自助申請 Hive 上表的權限。在申請通過審批工作流之后,就可以訪問了。
JDBC 是一種很常用的數據訪問接口,Hive 自帶了 Hive Server,可以接受 Hive JDBC Driver 的連接。實際上,Hive JDBC Driver 是將 JDBC 的請求轉化為 Thrift Call 發給 Hive Server,再由 Hive Server 將 Job 啟動起來。但 Hive 自帶的 Hive Server 并不支持 Security,默認會使用啟動 Hive Server 的用戶作為 Job 的 owner 提交到 Hadoop,造成安全漏洞。因此,我們自己開發了 Hive Server 的 Security,解決了這個問題。
但在 Hive Server 的使用過程中,我們發現 Hive Server 并不穩定,而且存在內存泄漏。更嚴重的是由于 Hive Server 自身的設計缺陷,不能很好地應對并發訪問的情況,所以我們現在并不推薦使用 Hive JDBC 的訪問方式。
社區后來重新開發了 Hive Server 2,解決了并發的問題,我們正在對 Hive Server 2 進行測試。
有一些同事,特別是 BI 的同事,不熟悉以 CLI 的方式使用 Hive,希望 Hive 可以有個 GUI 界面。在上線 Hive Server 之后,我們調研了開源的 SQL GUI Client——Squirrel,可惜使用 Squirrel 訪問 Hive 存在一些問題。
- 辦公網與線上環境是隔離的,在辦公機器上運行的 Squirrel 無法連到線上環境的 Hive Server。
- Hive 會返回大量的數據,特別是當用戶對于 Hive 返回的數據量沒有預估的情況下,Squirrel 會吃掉大量的內存,然后 Out of Memory 掛掉。
- Hive JDBC 實現的 JDBC 不完整,導致 Squirrel 的 GUI 中只有一部分功能可用,用戶體驗非常差。
基于以上考慮,我們自己開發了 Hive Web,讓用戶通過瀏覽器就可以使用 Hive。Hive Web 最初是作為大眾點評第一屆 Hackathon 的一個項目被開發出來的,技術上很簡單,但獲得了良好的反響。現在 Hive Web 已經發展成了一個 RESTful 的 Service,稱為 Polestar(https://github.com/dianping /polestar)。
圖 3 Polestar 的結構
圖 3 是 Polestar 的結構圖。目前 Hive Web 只是一個 GWT 的前端,通過 HAProxy 將 RESTfull Call 分發到執行引擎 Worker 執行。Worker 將自身的狀態保存在 MySQL,將數據保存在 HDFS,并使用 JSON 返回數據或數據在 HDFS 的路徑。我們還將 Shark 與 Hive Web 集成到了一起,用戶可以選擇以 Hive 或者 Shark 執行 Query。
一開始我們使用 LZO 作為存儲格式,使大文件可以在 MapReduce 處理中被切分,提高并行度。但 LZO 的壓縮比不夠高,按照我們的測試,Lzo 壓縮的文件,壓縮比基本只有 Gz 的一半。
經過調研,我們將默認存儲格式替換成 RCFile,在 RCFile 內部再使用 Gz 壓縮,這樣既可保持文件可切分的特性,同時又可獲得 Gz 的高壓縮比,而且因為 RCFile 是一種列存儲的格式,所以對于不需要的字段就不用從I/O讀入,從而提高了性能。圖 4 顯示了將 Nginx 數據分別用 Lzo、RCFile+Gz、RCFfile+Lzo 壓縮,再不斷增加 Select 的 Column 數,在 Hive 上消耗的 CPU 時間(越小越好)。
圖 4 幾種壓縮方式在 Hive 上消耗的 CPU 時間
但 RCFile 的讀寫需要知道數據的 Schema,而且需要熟悉 Hive 的 Ser/De 接口。為了讓 MapReduce Job 能方便地訪問 RCFile,我們使用了 Apache Hcatalog。
社區又針對 Hive 0.11 開發了 ORCFile,我們正在對 ORCFile 進行測試。
隨著 非死book、淘寶等大公司成功地在生產環境應用 HBase,HBase 越來越受到大家的關注,我們也開始對 HBase 進行測試。通過測試我們發現 HBase 非常依賴參數的調整,在默認配置下,HBase 能獲得很好的寫性能,但讀性能不是特別出色。通過調整 HBase 的參數,在 5 臺機器的 HBase 集群上,對于 1KB 大小的數據,也能獲得 5 萬左右的 TPS。在 HBase 0.94 之后,HBase 已經優化了默認配置。
原來我們希望 HBase 集群與主 Hadoop 集群共享 HDFS,這樣可以簡化運維成本。但在測試中,發現即使主 Hadoop 集群上沒有任何負載,HBase 的性能也很糟糕。我們認為,這是由于大量數據屬于遠程讀寫所引起的。所以我們現在的 HBase 集群都是單獨部署的。并且通過封裝 HBase Client 與 Master-Slave Replication,使用 2 套 HBase 集群實現了 HBase 的 HA,用來支撐線上業務。
2013 年持續演進
在建立了公司主要的大數據架構后,我們上線了 HBase 的應用,并引入 Spark/Shark 以提高 Ad Hoc Query 的執行時間,并調研分布式日志收集系統,來取代手工腳本做日志導入。
現在 HBase 上線的應用主要有 OpenAPI 和手機團購推薦。OpenAPI 類似于 HBase 的典型應用 Click Stream,將開放平臺開發者的訪問日志記錄在 HBase 中,通過 Scan 操作,查詢開發者在一段時間內的 Log,但這一功能目前還沒有對外開放。手機團購推薦是一個典型的 KVDB 用法,將用戶的歷史訪問行為記錄在 HBase 中,當用戶使用手機端訪問時,從 HBase 獲得用戶的歷史行為數據,做團購推薦。
當 Hive 大規模使用之后,特別是原來使用 OLAP 數據庫的 BI 部門的同事轉入后,一個越來越大的抱怨就是 Hive 的執行速度。對于離線的 ETL 任務,Hadoop/Hive 是一個良好的選擇,但動輒分鐘級的響應時間,使得 Ad Hoc Query 的用戶難以忍受。為了提高 Ad Hoc Query 的響應時間,我們將目光轉向了 Spark/Shark。
Spark 是美國加州大學伯克利分校 AMPLab 開發的分布式計算系統,基于 RDD(Resilient Distributed Dataset),主要使用內存而不是硬盤,可以很好地支持迭代計算。因為是一個基于 Memory 的系統,所以在數據量能夠放進 Memory 的情況下,能夠大幅縮短響應時間。Shark 類似于 Hive,將 SQL 解析為 Spark 任務,并且 Shark 復用了大量 Hive 的已有代碼。
在 Shark 接入之后,大大降低了 Ad Hoc Query 的執行時間。比如 SQL 語句:
select host, count(1) from HIPPOLOG where dt = '2013-08-28' group by host order by host desc;
在 Hive 執行的時間是 352 秒,而 Shark 只需要 60~70 秒。但對于 Memory 中放不下的大數據量,Shark 反而會變慢。
目前用戶需要在 Hive Web 中選擇使用 Hive 還是 Shark,未來我們會在 Hive 中添加 Semantic-AnalysisHook,通過解析用戶提交的 Query,根據數據量的大小,自動選擇 Hive 或者 Shark。另外,因為我們目前使用的是 Hadoop 1,不支持 YARN,所以我們單獨部署了一個小集群用于 Shark 任務的執行。
Wormhole 解決了結構化數據的交換問題,但對于非結構化數據,例如各種日志,并不適合。我們一直采用腳本或用戶程序直接寫 HDFS 的方式將用戶的 Log 導入 HDFS。缺點是,需要一定的開發和維護成本。我們希望使用 Apache Flume 解決這個問題,但在測試了 Flume 之后,發現了 Flume 存在一些問題:Flume 不能保證端到端的數據完整性,數據可能丟失,也可能重復。
例如,Flume 的 HDFSsink 在數據寫入/讀出 Channel 時,都有 Transcation 的保證。當 Transaction 失敗時,會回滾,然后重試。但由于 HDFS 不可修改文件的內容,假設有 1 萬行數據要寫入 HDFS,而在寫入 5000 行時,網絡出現問題導致寫入失敗,Transaction 回滾,然后重寫這 10000 條記錄成功,就會導致第一次寫入的 5000 行重復。我們試圖修正 Flume 的這些問題,但由于這些問題是設計上的,并不能通過簡單的 Bugfix 來解決,所以我們轉而開發 Blackhole 系統將數據流導入 HDFS。目前 Blackhole 正在開發中。
總結
圖 5 是各系統總體結構圖,深藍部分為自行開發的系統。
圖 5 大眾點評各系統總體結構圖
在這 2 年多的 Hadoop 實踐中,我們得到了一些寶貴經驗。
- 建設一支強大的技術團隊是至關重要的。Hadoop 的生態系統,還處在快速演化中,而且文檔相當匱乏。只有具備足夠強的技術實力,才能用好開源軟件,并在開源軟件不能滿足需求時,自行開發解決問題。
- 要立足于解決用戶的需求。用戶需要的東西,會很容易被用戶接受,并推廣開來;某些東西技術上很簡單,但可以解決用戶的大問題。
- 對用戶的培訓,非常重要。
作者房明,大眾點評網平臺架構組高級工程師,Apache Contributor。2011 年加入點評網,目前負責大數據處理的基礎架構及所有 Hadoop 相關技術的研發。
<span id="shareA4" class="fl"> </span>