變不可能為可能,Tachyon幫助Spark變小時級任務到秒

jopen 8年前發布 | 17K 次閱讀 Spark 數據庫

本文作者是Gianmario Spacagna和Harry Powell,Barclays的數據科學家。

集群計算和大數據技術已經取得了很多進展,不過現在很多大數據應用使用的還是HDFS這一分布式分件系統。HDFS是一個基于磁盤的文件系統,將數據存儲在磁盤上有一些問題,比如說面臨法律的監管,由磁盤讀寫造成的延遲也比較高。要避免這些問題可以將處理過的數據暫時放在內存中。Tachyon就可以幫你讓這些數據長期處于內存中并且在不同應用之間共享。

在巴克萊我們并沒有把數據存儲在HDFS上,而是使用了RDMBS關系型數據庫,而且我們還開發了一套讓Spark從RDBMS直接讀取數據的流程。我們作為讀取數據的一方對于數據庫的schema并不完全清楚,所以我們先讀取為動態類型的Spark DataFrame,分析了數據結構和內容之后再轉換為RDD。

這套流程有一個弊端。我們的數據集比較大,所以從RDBMS讀取數據要花挺長時間。按理說我們不應該頻繁地讀取數據,但Spark緩存的數據一崩潰一重啟就丟了。這時候就得重新讀取數據一次,這么來一次我們的系統就得掛一半個小時,一天重讀個幾次也是很常見的。哪怕我們做完了數據的映射之后只要運行Spark job也還得重新讀取數據,加入新特性,改模型,測試的時候都得這么干。

所以我們找到了Tachyon。Tachyon現在已經改名為Alluxio,它是一個數據存儲層,它讓所有的Spark應用可以直接通過文件API來讀取數據。既方便與現有應用的集成也很簡單。

現有架構的問題

如前所述,最主要的問題就是數據的加載。雖然Spark有緩存功能,但當我們重啟context,更新依賴或者重新提交job的時候緩存的數據就丟失了,只有從數據庫中重新加載這一個辦法。

下面的圖表是加載數據到6個Spark節點所需要花費的時間(以分鐘計)。橫坐標代表數據的行數,綠色是晚上六點數據庫比較閑的時候,灰色是早晨十點數據庫使用率比較高的時候而藍色是下午兩點數據庫非常忙的時候。

變不可能為可能,Tachyon幫助Spark變小時級任務到秒

我們可以看出加載數據的時間從幾分鐘到幾小時不等。考慮到我們一天要重啟很多次,光靠Spark的緩存肯定是不夠的。我們想要達到的目標有下面三點:

? 緩存DataFrame原始數據用于尋找正確的映射配置

? 緩存RDD用于分析

? 快速讀取中間結果并在不同應用之間共享數據

這三點匯成一句話其實就是要一個內存存儲系統。

Tachyon

Tachyon不單解決了我們數據存儲的問題還將目前的部署速度提升到了一個新臺階。Tachyon作為一種內存分布式文件系統,可以存儲任何文本格式或Parquet、Avro和Kryo等高效數據類型。我們還可以將結合進Snappy或LZO等壓縮算法來減少對內存的占用。

與Spark應用的集成非常簡單,只需調用DataFrame和RDD的加載存儲API并指定路徑URL和Tachyon協議即可。

我們存儲原始數據的目的是快速地迭代探索式分析和測試。現在我們可以直接從原始數據來構建最簡可行產品而不必在數據的處理上多花時間。下面是我們部署Tachyon之后的工作流程。

變不可能為可能,Tachyon幫助Spark變小時級任務到秒

橙色箭頭代表我們將數據的中間結果存儲到Tachyon以方便以后讀取。

Tachyon的配置

在巴克萊我們將Tachyon配置為與tmpfs文件系統配合(unix系統中的路徑為/dev/shm)。在Tachyon主節點上的配置由下面五步組成:

1.修改tachyon-env.sh配置文件

export TACHYON_WORKER_MEMORY_SIZE=${TACHYON_WORKER_MEMORY_SIZE:-24GB}

TACHYON_JAVA_OPTS中我們則保留默認配置:

-Dtachyon.worker.tieredstore.level0.
-Dtachyon.worker.tieredstore.level0.dirs.path=${TACHYON_RAM_FOLDER}
-Dtachyon.worker.tieredstore.level0.dirs.quota=${TACHYON_WORKER_MEMORY_SIZE}

2.將配置復制到worker節點中

./bin/tachyon copyDir ./conf/

3.格式化Tachyon

./bin/tachyon format

4.部署Tachyon。注意NoMount選項,NoMount不需要root權限:

./bin/tachyon-start.sh all NoMount

下面是我們整個架構的示意圖:

變不可能為可能,Tachyon幫助Spark變小時級任務到秒

Tachyon與Spark的結合使用

Tachyon中數據的讀寫非常簡單,因為它所提供的文件API與Java類似。往Tachyon中寫DataFrame:

dataframe.write.save("tachyon://master_ip:port/mydata/mydataframe.parquet")

從Tachyon中讀取DataFrame:

val dataframe: DataFrame = sqlContext.read.load("tachyon://master_ip:port/mydata/mydataframe.parquet")

寫入RDD:

rdd.saveAsObjectFile("tachyon://master_ip:port/mydata/myrdd.object")

讀取RDD:

val rdd: RDD[MyCaseClass] = sc.objectFile[MyCaseClass] ("tachyon://master_ip:port/mydata/myrdd.object")

這里要注意MyCaseClass的類型必須與寫入時一致,不然會出錯。

效果

我們使用Spark、Scala、DataFrame、JDBC、Parquet、Kryo和Tachyon創建出了一套數據項目流程,它具有擴展性好和速度快等優點,質量也足以直接部署到生產環境中。

Tachyon使我們能夠直接讀取原始數據而不必從數據庫中加載。數據寫入Tachyon之后也可以迅速開始分析,提高了工作的效率。

使用Tachyon將數據存儲在內存中讀寫只需幾秒鐘,所以在我們的流程中擴展幾乎不影響性能。迭代一次所需的時間從以前的幾個小時降低到了現在的幾秒鐘。

未來展望

Tachyon自身還在發展,我們對Tachyon的使用也還在進一步的探索中,所以有一些限制是難免的。下面我們就列出了一些還有待提高的地方。

? 我們沒有指定底層的存儲設備所以Tachyon所能夠存儲的數據量受到所分配空間的限制。Tachyon提供了分層存儲功能來解決這個問題。

? 設置JDBC驅動、分區策略和類映射還比較麻煩而且不夠易用。

? Spark和Tachyon共享內存,所以為了避免重復和過度的垃圾回收還需要做一些調整。

? 如果Tachyon worker掛了,數據就會丟失。因為我們沒有使用分層存儲所以Tachyon自己不會重新加載數據。對巴克萊來說還好但對于其他企業來說可能還需要進一步的改進。

總的來說我們還是很看好Tachyon,它應該會對企業中的數據項目有所影響。

原文鏈接: Making the Impossible Possible with Tachyon: Accelerate Spark Jobs from Hours to Seconds

來自: http://www.iteye.com/news/31395

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!