Spark新愿景:讓深度學習變得更加易于使用
前言
Spark成功的實現了當年的承諾,讓數據處理變得更容易,現在,雄心勃勃的Databricks公司展開了一個新的愿景:讓深度學習變得更容易。 當然牛好吹,也是要做些實際行動的,所有便有了 spark-deep-learning 項目。這件事情已經有很多人嘗試做了,但顯然太淺了,DB公司則做的更深入些。
原理
要做深度學習,肯定不能離開TensorFlow, MXNet之類的。 spark-deep-learning也是如此,嘗試和Tensorflow進行整合。那么如何進行整合呢? 我們知道Tensorflow其實是C++開發的,平時訓練啥的我們主要使用python API。Spark要和TensorFlow 進行整合,那么有三種方式:
- 走Tensorflow的Java API
- 走Tensorflow的Python API
- 通過JNI直接走Tensorflow的C++ API
因為Spark自己也可以使用Python,雖然有性能的上的損耗(據說>30%),但是終究是能跑起來。實際上Spark采用了2和3的結合。 第二條容易理解,第三條則主要依賴于另外一個項目 tensorframes 。這個項目主要是實現tensorflow和spark的互相調用。簡單的來說,在spark的dataframe運算可以通過JNI調用tensorflow來完成,反之Spark的dataframe也可以直接喂給tensorflow(也就是tensorflow可以直接輸入dataframe了)。有了這個之后,spark-deep-learning 則無需太多關注如何進行兩個系統完成交互的功能,而是專注于完成對算法的集成了。
為了給出一個直觀的感受,我們看個示例代碼(來源于官方):
import tensorflow as tf
import tensorframes as tfs
from pyspark.sql import Row
data = [Row(x=float(x)) for x in range(10)]
df = sqlContext.createDataFrame(data)
with tf.Graph().as_default() as g:
# The TensorFlow placeholder that corresponds to column 'x'.
# The shape of the placeholder is automatically inferred from the DataFrame.
x = tfs.block(df, "x")
# The output that adds 3 to x
z = tf.add(x, 3, name='z')
# The resulting dataframe
df2 = tfs.map_blocks(z, df)
The transform is lazy as for most DataFrame operations. This will trigger it:
df2.collect()</code></pre>
在這里,通過tensorframes 我可以對spark dataframe里列使用tensorflow來進行處理。
x = tfs.block(df, "x")
相當于
x = tf.placeholder(shape=..., dtype=..., name='x')
程序自動從df可以知道數據類型。
df2 = tfs.map_blocks(z, df)
則相當于將df 作為tf的feed_dict數據。最終f2.collect 觸發實際的計算。
spark-deep-learning 提出了三個新的東西:
- 首先是,Spark的數據終于可以用DF的方式無縫的喂給Tensorflow/Keras了,而且對Tensorflow/Keras的適配了一套Mllib的庫,方便以Spark Mllib的方式進行編程。當然,為了使得原先是Tensorflow/Keras的用戶感覺爽,如果你使用Python API你也可以完全使用Keras/Tensorflow 的Style來完成代碼的編寫。
- 其次是多個TF模型同時訓練,給的一樣的數據,但是不同的參數,從而充分利用分布式并行計算來選擇最好的模型。
- 另外是模型訓練好后如何集成到Spark里進行使用呢?沒錯,SQL UDF函數,你可以很方便的把一個訓練好的模型注冊成UDF函數,從而實際完成了模型的部署。
方便理解,我們也簡單看看一些代碼:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
from sparkdl import readImages
from pyspark.sql.functions import lit
//讀取圖片,設置為1分類
tulips_df = readImages(img_dir + "/tulips").withColumn("label", lit(1))
//讀取圖片,設置為2分類
daisy_df = readImages(img_dir + "/daisy").withColumn("label", lit(0))
//構成訓練集
train_df = tulips_train.unionAll(daisy_train)
//使用已經配置好的模型(InceptionV3)
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
//接一個分類器,也就是傳說中的遷移學習
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
//組裝下
p = Pipeline(stages=[featurizer, lr])
//訓練,和Mllib保持了一致
model = p.fit(image_df) # train_images_df is a dataset of images (SpImage) and labels
//預測
df = model.transform(train_df.limit(10)).select("image", "probability", "uri", "label")
predictionAndLabels = df.select("prediction", "label")</code></pre>
整個模型一氣呵成。
如何開發
spark-deep-learning 還處于早期,很多東西還不太完善。
為了方便看源碼以及編寫實際的代碼,你可以clone最新的代碼,然后使用intellij idea 可以很方便的導入進來。導入進來后,添加python framework的支持,然后把根目錄下的python目錄作為source 目錄,接著進入project structured 添加pyspark 的zip(一般放在spark home 里的lib目錄),這樣你在spark-deep-learning里就可以直接做開發了。
spark-deep-learning使用的是spark 2.1.1 以及python 2.7 ,不過我的環境是spark 2.2.0, python 3.6。 所以你需要在build.sbt里第一行修改為
val sparkVer = sys.props.getOrElse("spark.version", "2.2.0")
同時保證你的python為2.7版本(你可以通過一些python的管理工具來完成版本的切換),然后進行編譯:
build/sbt assembly
編譯的過程中會跑單元測試,在spark 2.2.0會報錯,原因是udf函數不能包含“-”,所以你找到對應的幾個測試用例,修改里面的udf函數名稱即可。
編譯好后,你就可以直接寫個腳本,比如:
import os
from pyspark import *
from sparkdl import readImages
os.environ['PYSPARK_PYTHON'] = '/Users/allwefantasy/python2.7/tensorflow/bin/python'
sc = SparkContext.getOrCreate()
image_df = readImages("/Users/allwefantasy/resources/images/flower_photos/daisy/")
image_df.show()</code></pre>
比如我這里簡單的讀取圖片文件,并且顯示出來。你可以直接點擊右鍵運行,也可以通過spark-submit運行:
./bin/spark-submit --driver-memory 8g
--py-files spark-deep-learning-assembly-0.1.0-spark2.2.jar \
--jars spark-deep-learning-assembly-0.1.0-spark2.2.jar \
--master local[*] spark-deep-learning/python/tests/Test.py
因為比較消耗內存,這里可以通過driver-memory 設置spark submit 內存。
來自:http://www.jianshu.com/p/07e8200b7cea