Spark 入門(Python、Scala 版)
本文中,我們將首先討論如何在本地機器上利用Spark進行簡單分析。然后,將在入門級水平探索Spark,了解Spark是什么以及它如何工作(希望可以激發更多探索)。最后兩節將開始通過命令行與Spark進行交互,然后演示如何用Python寫Spark應用,并作為Spark作業提交到集群上。同時也會提供相應的 Scala 版本。
1、設置Spark環境
在本機設置和運行Spark非常簡單。你只需要下載一個預構建的包,只要你安裝了Java 6+和Python 2.6+,就可以在Windows、Mac OS X和Linux上運行Spark。確保java程序在PATH環境變量中,或者設置了JAVA_HOME環境變量。類似的,python也要在PATH中。
假設你已經安裝了Java和Python,以及 Spark,如果沒有請參照之前的教程:
《Spark 偽分布式 & 全分布式 安裝指南》:http://my.oschina.net/leejun2005/blog/394928
注意:如果要用到下文的 pyspark,則需要設置 python 相關的 spark 包路徑:
vi .bashrc export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
否則會報錯: ImportError: No module named pyspark 或者 ImportError: No module named py4j.java_gateway
source這些配置(或者重啟終端)之后,你就可以在本地運行一個pyspark解釋器。執行pyspark命令,你會看到以下結果:
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
現在Spark已經安裝完畢,可以在本機以”單機模式“使用。你可以在本機開發應用并提交Spark作業,這些作業將以多進程/多線程模式運行的,或者,配置該機器作為一個集群的客戶端(不推薦這樣做,因為在Spark作業中,驅動程序(driver)是個很重要的角色,并且應該與集群的其他部分處于相同網絡)。
2、簡化 Spark 終端輸出
Spark(和PySpark)的執行可以特別詳細,很多INFO日志消息都會打印到屏幕。開發過程中,這些非常惱人,因為可能丟失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設置$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”擴展名。
編輯新文件,用WARN替換代碼中出現的INFO。你的log4j.properties文件類似:
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
現在運行PySpark,輸出消息將會更簡略!
3、測試 Spark context
talk is cheap,show you the code. 咱們先來測試下 Spark 環境是否正常:
from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b)
如果你能得到一個數字 15,而且沒有錯誤發生,那么你的context正確工作了!
4、Spark是什么?
既然設置好了Spark,現在我們討論下Spark是什么。Spark是個通用的集群計算框架,通過將大量數據集計算任務分配到多臺計算機上,提供高效內存計算。如果你熟悉Hadoop,那么你知道分布式計算框架要解決兩個問題:如何分發數據和如何分發計算。Hadoop使用HDFS來解決分布式數據問題,MapReduce計算范式提供有效的分布式計算。類似的,Spark擁有多種語言的函數式編程API,提供了除map和reduce之外更多的運算符,這些操作是通過一個稱作彈性分布式數據集(resilient distributed datasets, RDDs)的分布式數據框架進行的。
本質上,RDD是種編程抽象,代表可以跨機器進行分割的只讀對象集合。RDD可以從一個繼承結構(lineage)重建(因此可以容錯),通過并行操作訪問,可以讀寫HDFS或S3這樣的分布式存儲,更重要的是,可以緩存到worker節點的內存中進行立即重用。由于RDD可以被緩存在內存中,Spark對迭代應用特別有效,因為這些應用中,數據是在整個算法運算過程中都可以被重用。大多數機器學習和最優化算法都是迭代的,使得Spark對數據科學來說是個非常有效的工具。另外,由于Spark非常快,可以通過類似Python REPL的命令行提示符交互式訪問。
Spark庫本身包含很多應用元素,這些元素可以用到大部分大數據應用中,其中包括對大數據進行類似SQL查詢的支持,機器學習和圖算法,甚至對實時流數據的支持。
核心組件如下:
-
Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。
</li> -
Spark SQL:提供通過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行交互的API。每個數據庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。對熟悉Hive和HiveQL的人,Spark可以拿來就用。
</li> -
Spark Streaming:允許對實時數據流進行處理和控制。很多實時數據庫(如Apache Store)可以處理實時數據。Spark Streaming允許程序能夠像普通RDD一樣處理實時數據。
</li> -
MLlib:一個常用機器學習算法庫,算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習算法,比如分類、回歸等需要對大量數據集進行迭代的操作。之前可選的大數據機器學習庫Mahout,將會轉到Spark,并在未來實現。
</li> -
GraphX:控制圖、并行圖操作和計算的一組算法和工具的集合。GraphX擴展了RDD API,包含控制圖、創建子圖、訪問路徑上所有頂點的操作。
</li> </ul>由于這些組件滿足了很多大數據需求,也滿足了很多數據科學任務的算法和計算上的需要,Spark快速流行起來。不僅如此,Spark也提供了使用Scala、Java和Python編寫的API;滿足了不同團體的需求,允許更多數據科學家簡便地采用Spark作為他們的大數據解決方案。
5、對Spark編程
編寫Spark應用與之前實現在Hadoop上的其他數據流語言類似。代碼寫入一個惰性求值的驅動程序(driver program)中,通過一個動作(action),驅動代碼被分發到集群上,由各個RDD分區上的worker來執行。然后結果會被發送回驅動程序進行聚合或編譯。本質上,驅動程序創建一個或多個RDD,調用操作來轉換RDD,然后調用動作處理被轉換后的RDD。
這些步驟大體如下:
(1)定義一個或多個RDD,可以通過獲取存儲在磁盤上的數據(HDFS,Cassandra,HBase,Local Disk),并行化內存中的某些集合,轉換(transform)一個已存在的RDD,或者,緩存或保存。
(2)通過傳遞一個閉包(函數)給RDD上的每個元素來調用RDD上的操作。Spark提供了除了Map和Reduce的80多種高級操作。
(3)使用結果RDD的動作(action)(如count、collect、save等)。動作將會啟動集群上的計算。
當Spark在一個worker上運行閉包時,閉包中用到的所有變量都會被拷貝到節點上,但是由閉包的局部作用域來維護。Spark提供了兩種類型的共享變量,這些變量可以按照限定的方式被所有worker訪問。廣播變量會被分發給所有worker,但是是只讀的。累加器這種變量,worker可以使用關聯操作來“加”,通常用作計數器。
Spark應用本質上通過轉換和動作來控制RDD。后續文章將會深入討論,但是理解了這個就足以執行下面的例子了。
6、Spark的執行
簡略描述下Spark的執行。本質上,Spark應用作為獨立的進程運行,由驅動程序中的SparkContext協調。這個context將會連接到一些集群管理者(如YARN),這些管理者分配系統資源。集群上的每個worker由執行者(executor)管理,執行者反過來由SparkContext管理。執行者管理計算、存儲,還有每臺機器上的緩存。
重點要記住的是應用代碼由驅動程序發送給執行者,執行者指定context和要運行的任務。執行者與驅動程序通信進行數據分享或者交互。驅動程序是Spark作業的主要參與者,因此需要與集群處于相同的網絡。這與Hadoop代碼不同,Hadoop中你可以在任意位置提交作業給JobTracker,JobTracker處理集群上的執行。
7、與Spark交互
使用Spark最簡單的方式就是使用交互式命令行提示符。打開PySpark終端,在命令行中打出pyspark。
PySpark將會自動使用本地Spark配置創建一個SparkContext。你可以通過sc變量來訪問它。我們來創建第一個RDD。
>>> text = sc.textFile("shakespeare.txt") >>> print text shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
textFile方法將莎士比亞全部作品加載到一個RDD命名文本。如果查看了RDD,你就可以看出它是個MappedRDD,文件路徑是相對于當前工作目錄的一個相對路徑(記得傳遞磁盤上正確的shakespear.txt文件路徑)。我們轉換下這個RDD,來進行分布式計算的“hello world”:“字數統計”。
>>> from operator import add >>> def tokenize(text): ... return text.split() ... >>> words = text.flatMap(tokenize) >>> print words PythonRDD[2] at RDD at PythonRDD.scala:43
我們首先導入了add操作符,它是個命名函數,可以作為加法的閉包來使用。我們稍后再使用這個函數。首先我們要做的是把文本拆分為單詞。我們創建了一個tokenize函數,參數是文本片段,返回根據空格拆分的單詞列表。然后我們通過給flatMap操作符傳遞tokenize閉包對textRDD進行變換創建了一個wordsRDD。你會發現,words是個PythonRDD,但是執行本應該立即進行。顯然,我們還沒有把整個莎士比亞數據集拆分為單詞列表。
如果你曾使用MapReduce做過Hadoop版的“字數統計”,你應該知道下一步是將每個單詞映射到一個鍵值對,其中鍵是單詞,值是1,然后使用reducer計算每個鍵的1總數。
首先,我們map一下。
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString() (2) PythonRDD[3] at RDD at PythonRDD.scala:43 | shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 | shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
我使用了一個匿名函數(用了Python中的lambda關鍵字)而不是命名函數。這行代碼將會把lambda映射到每個單詞。因此,每個x都是一個單詞,每個單詞都會被匿名閉包轉換為元組(word, 1)。為了查看轉換關系,我們使用toDebugString方法來查看PipelinedRDD是怎么被轉換的。可以使用reduceByKey動作進行字數統計,然后把統計結果寫到磁盤。
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc")
一旦我們最終調用了saveAsTextFile動作,這個分布式作業就開始執行了,在作業“跨集群地”(或者你本機的很多進程)運行時,你應該可以看到很多INFO語句。如果退出解釋器,你可以看到當前工作目錄下有個“wc”目錄。
$ ls wc /_SUCCESS part-00000 part-00001
每個part文件都代表你本機上的進程計算得到的被保持到磁盤上的最終RDD。如果對一個part文件進行head命令,你應該能看到字數統計元組。
$ head wc/part-00000 (u'fawn', 14) (u'Fame.', 1) (u'Fame,', 2) (u'kinghenryviii@7731', 1) (u'othello@36737', 1) (u'loveslabourslost@51678', 1) (u'1kinghenryiv@54228', 1) (u'troilusandcressida@83747', 1) (u'fleeces', 1) (u'midsummersnightsdream@71681', 1)
注意這些鍵沒有像Hadoop一樣被排序(因為Hadoop中Map和Reduce任務中有個必要的打亂和排序階段)。但是,能保證每個單詞在所有文件中只出現一次,因為你使用了reduceByKey操作符。你還可以使用sort操作符確保在寫入到磁盤之前所有的鍵都被排過序。
一個完整的例子:
from pyspark import SparkContext sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") lines = sc.textFile("hdfs://110.9.17.187:8020/tmp/num.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) print totalLength
# lines.count()</pre>
scala 版本如下:
val lines = sc.textFile("hdfs://110.9.17.187:8020/tmp/num.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
PS:我這邊用上面的兩段代碼測試發現(一億五千萬隨機數,500MB),scala 比 python 快了 20 倍,跟官方的性能數據相差太遠了,
應該是pythonAPI或者環境哪里有問題~
8、編寫一個Spark應用
編寫Spark應用與通過交互式控制臺使用Spark類似。API是相同的。首先,你需要訪問<SparkContext,它已經由<pyspark自動加載好了。
使用Spark編寫Spark應用的一個基本模板如下:
## Spark Application - execute with spark-submit: spark-submit app.py
Imports
from pyspark import SparkConf, SparkContext
Module Constants
APP_NAME = "My Spark Application"
Closure Functions
Main functionality
def main(sc): pass if name == "main": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) # Execute Main functionality main(sc)</pre>
這個模板列出了一個Spark應用所需的東西:導入Python庫,模塊常量,用于調試和Spark UI的可識別的應用名稱,還有作為驅動程序運行的一些主要分析方法學。在ifmain中,我們創建了SparkContext,使用了配置好的context執行main。我們可以簡單地導入驅動代碼到pyspark而不用執行。注意這里Spark配置通過setMaster方法被硬編碼到SparkConf,一般你應該允許這個值通過命令行來設置,所以你能看到這行做了占位符注釋。
使用<sc.stop()或<sys.exit(0)來關閉或退出程序。
## Spark Application - execute with spark-submit
Imports
import csv import matplotlib.pyplot as plt from StringIO import StringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext
Module Constants
APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields)
Closure Functions
def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(row[:11]) def split(line): """ Operator function for splitting a line with csv module """ reader = csv.reader(StringIO(line)) return reader.next() def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center') # Set the ticks ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] len(xt)) # minimize chart junk plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') plt.show()
Main functionality
def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("ontime/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print "%0.0f minutes delayed\t%s" % (d[1], d[0]) # Show a bar chart of the delays plot(delays) if name == "main": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc)</pre>
使用<spark-submit命令來運行這段代碼(假設你已有ontime目錄,目錄中有兩個CSV文件):
~$ spark-submit app.py
這個Spark作業使用本機作為master,并搜索app.py同目錄下的ontime目錄下的2個CSV文件。最終結果顯示,4月的總延誤時間(單位分鐘),既有早點的(如果你從美國大陸飛往夏威夷或者阿拉斯加),但對大部分大型航空公司都是延誤的。注意,我們在app.py中使用matplotlib直接將結果可視化出來了:
這段代碼做了什么呢?我們特別注意下與Spark最直接相關的main函數。首先,我們加載CSV文件到RDD,然后把split函數映射給它。split函數使用csv模塊解析文本的每一行,并返回代表每行的元組。最后,我們將collect動作傳給RDD,這個動作把數據以Python列表的形式從RDD傳回驅動程序。本例中,airlines.csv是個小型的跳轉表(jump table),可以將航空公司代碼與全名對應起來。我們將轉移表存儲為Python字典,然后使用sc.broadcast廣播給集群上的每個節點。
接著,main函數加載了數據量更大的flights.csv([譯者注]作者筆誤寫成fights.csv,此處更正)。拆分CSV行完成之后,我們將parse函數映射給CSV行,此函數會把日期和時間轉成Python的日期和時間,并對浮點數進行合適的類型轉換。每行作為一個NamedTuple保存,名為Flight,以便高效簡便地使用。
有了Flight對象的RDD,我們映射一個匿名函數,這個函數將RDD轉換為一些列的鍵值對,其中鍵是航空公司的名字,值是到達和出發的延誤時間總和。使用reduceByKey動作和add操作符可以得到每個航空公司的延誤時間總和,然后RDD被傳遞給驅動程序(數據中航空公司的數目相對較少)。最終延誤時間按照升序排列,輸出打印到了控制臺,并且使用matplotlib進行了可視化。
這個例子稍長,但是希望能演示出集群和驅動程序之間的相互作用(發送數據進行分析,結果取回給驅動程序),以及Python代碼在Spark應用中的角色。
9、實時處理:Spark Streaming
Spark Streaming 主要用來做實時處理,其原理本質上是“更細粒度的批處理”:
from pyspark import SparkContext from pyspark.streaming import StreamingContext
Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") ssc = StreamingContext(sc, 3)
Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("110.9.17.187", 9999)
Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
Count each word in each batch
pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y)
Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate</pre>
Scala 版本:
import org.apache.spark. import org.apache.spark.streaming.
// not necessary in Spark 1.3+
// Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario.
object Streaming { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(.split(" ")) // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _)
// Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }</pre>
測試:
# TERMINAL 1:
Running Netcat
$ nc -lk 9999
hello world
...</pre>
# TERMINAL 2: RUNNING NetworkWordCount spark-submit TestScala.jar localhost 9999
...
Time: 1357008430000 ms
(hello,1) (world,1) ...</pre>
10、結論
盡管算不上一個完整的Spark入門,我們希望你能更好地了解Spark是什么,如何使用進行快速、內存分布式計算。至少,你應該能將Spark運行起來,并開始在本機或Amazon EC2上探索數據。
Spark不能解決分布式存儲問題(通常Spark從HDFS中獲取數據),但是它為分布式計算提供了豐富的函數式編程API。這個框架建立在伸縮分布式數據集(RDD)之上。RDD是種編程抽象,代表被分區的對象集合,允許進行分布式操作。RDD有容錯能力(可伸縮的部分),更重要的時,可以存儲到節點上的worker內存里進行立即重用。內存存儲提供了快速和簡單表示的迭代算法,以及實時交互分析。
由于Spark庫提供了Python、Scale、Java編寫的API,以及內建的機器學習、流數據、圖算法、類SQL查詢等模塊;Spark迅速成為當今最重要的分布式計算框架之一。與YARN結合,Spark提供了增量,而不是替代已存在的Hadoop集群,它將成為未來大數據重要的一部分,為數據科學探索鋪設了一條康莊大道。
11、Refer:
[1] Spark入門(Python版)
http://blog.jobbole.com/86232/
[2] Spark編程指南筆記
http://blog.javachen.com/2015/02/03/spark-programming-guide/#
[3] Spark Streaming Programming Guide
https://spark.apache.org/docs/latest/streaming-programming-guide.html
[4] 大數據算命系列(8): spark框架與pyspark簡介
https://github.com/renewjoy/bigdata-fortune-telling/blob/master/08_pyspark/pyspark.rst
[5] PySpark內部實現
http://blog.csdn.net/lantian0802/article/details/36376873
[6] Spark Streaming
http://debugo.com/spark-streaming/
[7] Spark新年福音:一個用于大規模數據科學的API——DataFrame
http://www.csdn.net/article/2015-02-17/2823997
[8] Python vs. Scala vs. Spark
http://emptypipes.org/2015/01/17/python-vs-scala-vs-spark/
[9] Spark1.0.0 應用程序部署工具spark-submit
http://blog.csdn.net/book_mmicky/article/details/25714545
【Spark1.3官方翻譯】 Spark Submit提交應用程序,spark1.3spark
http://www.bkjia.com/yjs/980456.html
[10] 使用IntelliJ IDEA開發Spark1.0.0應用程序
http://blog.csdn.net/book_mmicky/article/details/25714549
Scala從零開始:使用Intellij IDEA寫hello world
http://blog.csdn.net/asongoficeandfire/article/details/26412493
Scala從零開始:使用Scala IDE寫hello world
http://blog.csdn.net/asongoficeandfire/article/details/21490101
運行第一個SparkStreaming程序(及過程中問題解決)
http://www.jianshu.com/p/59733597d448#
來自:http://my.oschina.net/leejun2005/blog/411605