松本行弘:我為什么要開發新語言Steem(上)
原文 http://www.ituring.com.cn/article/177079
原文刊載于《日經Linux》2015/01號中(?日經BP社 2015)。原文題為《跟Matz邊做邊學編程語言:21世紀的并發編程語言》。文/松本行弘,譯/劉斌。
隨著多核CPU的普及,shell腳本的(一部分)價值也在逐漸被我們重新認識。shell腳本的基本計算模型是基于管道來連接多個進程。如果操 作系統支持多核的話,則各進程會被分配到不同的CPU上去執行,這樣就可以充分發揮多核CPU的優勢。同時這也證明了一點,那就是只要選擇合適的計算模 型,就能非常容易地實現并發執行。
在實際的業務系統中,我也聽說有人采用shell腳本來進行處理。雖說是用shell腳本進行信息的篩選和加工,但是和傳統的軟件開發模式相比,它有著成本低、靈活性高等優點。
shell腳本已經有些力不從心
但也并不能說shell腳本有多么理想,實際上它也有它的局限性。
比如,創建OS進程的成本非常高,如果需要使用shell腳本創建大量輕量進程的話,那么在性能上將會非常不利。
還有另外一種成本,由于連接進程的管道只能發送字節數組的數據,所以發送方需要先將數據轉換為字節數組,接收方則需要將字節數組還原。比如很多時 候我們都會使用以逗號分隔的CSV(Comma Separated Values)格式或表示JavaScript對象的JSON(JavaScript Object Notation)格式,將數據從這些格式轉換為字節數組,或者對字節數組進行解析并還原,這樣做的成本是非常高的。
在進行大數據處理、高性能計算等時,我們多會選擇使用多核CPU。因此,數據轉換或創建進程所花費的成本是不可忽視的。這可以說是shell腳本的一個缺陷。
更進一步來說,構成管道的進程(process)所執行的命令(command),可能并不是由同一個開發者所開發的,這些命令的參數設置方法等往往并不統一,因此要想熟練使用這些命令,難度會有所增加。
21世紀的shell腳本
這樣說來,如果能將shell腳本的優點,和通用編程語言的優點結合起來的話,應該就可以創造出一門非常強大的語言。
首先我們來看看這門強大的語言都需要滿足哪些必要條件。
第1個條件是可以進行輕量的并發。由于不管是OS級別的進程還是線程,創建成本都很高,因此我們應該盡量避免去使用它們。比較現實的方式是在一個 OS的進程中,預先生成與CPU的核數(+α)相同個數的線程,讓它們輪番去執行各種操作請求。采用這種實現方式的典型語言包括Erlang和Go。在本 文中,我們將相當于Erlang中的“process”、Go中的“goroutine”的概念稱為“任務”(task)。
第2個條件就是解決并發執行時的競爭條件。具體來說就是“狀態”的排除。也就是說,如果變量或者屬性的值發生變化,就會產生一個新的狀態,這也帶 來了因執行時機(timing)不同而產生問題的危險。所以需要將所有數據都設為不可變(immutable),這樣就可以避免因執行時機而出現的缺陷。
第3個條件是計算模型。線程模型雖然應用領域非常廣泛,但自由程度也很高,因此程序可能會變得難以掌控。于是我們可以參考shell的執行模型, 引入一個抽象度非常高的并發計算模型。抽象度高了,反過來表現的自由度就會降低,所以在編寫代碼的時候就要下一番功夫。而另一方面,我們的程序也會變得非 常容易調試。
新語言Streem
于是我就開始設計一門滿足上述條件的新語言。由于是以流(Stream)為計算模型的語言,因此我們就將它命名為“Streem”。
首先我們來看看它的語法。由于它是基于shell的,因此也沒有什么特別的語法。它的基本語法如下所示。
表達式1 | 表達式2 | ...
若要采用這種語法來實現一個從標準輸入讀取數據,然后輸出到標準輸出,功能類似于cat命令的程序,只需編寫如下代碼即可。
STDIN | STOUT
就是如此簡單。STDIN和STDOUT是用常量表示標準輸入輸出的對象。Streem程序中的STDIN會從標準輸入中一行一行地讀取(字符 串)數據,并將這一行行的數據傳遞給其他表達式,就像“流”一樣。像這樣用來表示數據的流動的對象就稱為“流”(對象)。STDOUT則正相反,它是一個 接收字符串參數并輸出到外部(標準輸出)的流。如果想讀取指定文件名的文件內容的話,可以使用如下代碼。
read(path)
如果是寫入到指定文件,則可以使用如下代碼。
write(path)
這兩個方法都會返回用來讀取或者寫入的流對象。
表達式
Streem的表達式包括常量、變量引用、函數調用、數組表達式、Map表達式、函數表達式、運算符表達式以及if表達式。它們的語法如表1所示。如果你有其他語言的編程經驗的話,這些內容應該都很容易理解。
表1 Streem的表達式
類型 | 語法 | 示例 |
字符串常量(字面值) | "字符串" | "foobar" |
數值常量 | 數值表現形式 | 123 |
符號(Symbol)常量 | :標識符 | :Foo |
變量引用 | 標識符 | FooBar |
函數調用 | 標識符(參數...) | square(2) |
方法調用 | 表達式.標識符(參數...) | ary.push(2) |
數組表達式 | [表達式, ...] | [1,2,3] |
Map表達式 | [表達式:表達式, ...] | [1:"2", 3:"4"], [:](空Map) |
函數表達式 | {|變量..| 語句 } | {|x| x + 1} |
運算符表達式 | 表達式 運算符 表達式 | 1 + 1 |
if表達式 | if 表達式 {語句 ...} else { 語句 ...} | if true {1} else {2} |
賦值
Streem中的賦值有兩種方式。第一種是和其他語言類似的使用“=”的方式。
ONE = 1
還有一種就是使用“->”的反方向賦值的方法。如果把上面采用等號的賦值語句改為使用“->”的話,代碼將如下所示。
1 -> ONE
這種賦值方式非常適合在將管道執行結果存放到變量時使用,這樣代碼的順序就能和程序的執行流程保持一致,非常方便。
不管采取上面哪種賦值方式,為了避免變量狀態的改變,都需要遵循如下規則。
-
規則1:不能給同一個變量進行多次賦值。對一個變量的賦值在其作用域之內只能進行一次。
-
規則2:僅在交互執行環境的頂層作用域(top level)下,才允許對一個變量進行重復賦值。不過,你可以將這看作是對變量名相同的不同變量進行的賦值。
語句的組合
Streem支持將多條表達式語句并列編寫。語句之間用分號(;)或者換行來分割。也可以認為這些語句會按照編碼的順序來執行。如果這些語句之間沒有依賴關系的話,在實際執行的時候也可能會并發執行。
Streem程序示例
接下來讓我們來看一些Streem程序的具體例子。
前面我們看到了一個實現類似于cat命令的例子,下面我們來看一個稍微有點不同的例子。這里我們將使用Streem來實現經常被拿來舉例的 FizzBuzz游戲(圖1)。這個游戲要求玩家從1開始輸入數字,當這個數字能被3整除的時候,會輸出“Fizz”;當能被5整除的時候,會輸出 “Buzz”;當能同時被3和5整除的時候,則會輸出“FizzBuzz”。
圖1 Streem版FizzBuzz
seq(100) | { |x| if x % 15 == { "FizzBuzz" } else if x % 3 == { "Fizz" } else if x % 5 == { "Buzz" } else { x } } | STOUT
seq函數用來生成一個從1到指定參數的整數數列。如果將該數列連接到管道上的話,則該數列會將各元素的值按順序傳遞給管道。STDOUT則將接收到的數列的值進行輸出。
從上面的例子我們可以看出,Streem的管道表述方式直接體現了要干什么,是不是更為直截了當呢?
1對1、1對n、n對m
通過圖1的例子,我們已經知道了使用Streem可以非常簡單地完成諸如對數值序列進行處理并輸出的程序。然而現實中的程序并不完全都是對這種1 對1關系的數據進行處理。比如類似于grep(單詞搜索)這樣“查找所有滿足指定條件”的類型,以及類似于wc(統計單詞數量)這樣對數據進行聚合計算的 類型。
Streem也支持這種應用場景,并提供了一些關鍵字來進行這類操作。
在一次執行需要返回多個值的時候,可以使用emit。如果給它傳遞多個(參數)值的話,那么它也會返回多個值。也就是說,
emit 1, 2
就相當于下面這行代碼。
emit 1; emit 2
此外,如果在數組前面加上“*”的話,就表示要返回這個數組的所有元素。比如,
a = [1, 2, 3]; emit *a
就相當于如下代碼。
emit 1; emit 2; emit 3
圖2是一個使用emit的例子。這個程序會將從1到100之間的整數每個都打印兩次。
圖2 使用emit的例子
# 將從1到100的整數分別打印兩次 seq(100) | {|x| emit x, x} | STDOUT
return用來終止函數的執行并返回值。return可以返回多個值,這時候它就相當于對多個值進行了emit操作。有一點前面我們沒有提到,那就是如果一個函數主體只有一個表達式的話,那么即使不使用return,這個表達式的執行結果也會作為函數的返回值。
使用emit和return的話,就可以產生比輸入值個數更多的返回值。與之相反,如果我們想生成少于輸入值個數的返回值的話,則可以使用 skip函數。skip用來終止當前函數的執行,但是并不產生任何返回值。圖3是一個使用skip的例子,該程序用來篩選出1到100之間的偶數。
圖3 使用skip的例子
# skip奇數,選擇偶數 seq(100) | {|x| if x % 2 == 1 {skip}; x} | STDOUT
不可變性(immutable)
前面我們已經說過,在Streem中,為了避免競爭條件的出現,所有的數據結構都是不可變的。數組和Map(類似于Ruby中的Hash)類型的 變量也是不可變的。向這些結構的數據添加新元素的時候,并不是直接修改已有的數據,而是在原數據的基礎上添加新元素來創建新的數據(圖4)。
圖4 修改immutable數據
a = [1, 2, 3, 4] # a是一個擁有4個元素的數組 b = a.push(5) # b是在a之后添加了5的數組
在一般的面向對象編程語言中,對象的屬性(實例的變量)都是可以修改的,而在Streem中,這種操作是被禁止的,這需要注意一下。從這一點上來說,Streem非常像函數式編程語言。
統計單詞出現次數
接著我們再看另一個Streem程序的例子。這里我們選擇了在介紹MapReduce時經常使用的一個例子——統計單詞出現次數。下面我們用Streem來實現一下(圖5)。
圖5 使用Streem統計單詞出現次數
STDIN | { |line| return *line.split } | reduce([:]) { |map, word| # [:]是一個空Map map.set(word, map.get(word,) + 1) } | STDOUT
首先我們對圖5的程序中新出現的語法進行說明。在調用reduce函數的地方,我們看到了類似于Ruby中的Block的語句。這是Streem 語言中的一個語法糖,如果函數的參數列表后面是一個函數表達式的話,那么這個函數表達式就會被視為該函數的參數列表的最后一個元素。也就是說表達式
reduce(0) {|x, y| x + y }
是下面的表達式的另一種寫法。
reduce(0, {|x, y| x + y })
這也是Streem為了能在普通函數調用中將類似于Ruby中的Block變量作為參數而做出的努力,而不必使用&block的方式。
如果我們看一下圖5中程序的實際執行情況,就會看到具體流程是首先從STDIN中一行行地讀取數據,用split進行單詞分割,再通過 reduce函數來統計各個單詞出現的次數,并將結果存放到Map中去。如果作為key的單詞不存在的話,map.get就會返回第二個參數作為默認值 (這里是0),這樣就可以通過map.get得到該單詞的出現次數。map.set用來更新單詞出現的次數,并創建一個新的Map。因為每次更新單詞出現 次數時都會創建一個新的Map,所以看上去有點浪費系統資源,但實際上我們無需為此擔心,完全可以將這些問題交給垃圾收集器或者系統運行時環境的內部實 現。實際上Clojure及Haskell等很多函數式編程語言也都采用了相同的策略。
最后,程序將生成的Map和STDIN通過管道連接起來,將Map中的鍵值對打印出來,顯示各個單詞及其出現次數。這個例子中我們并沒有做其他的額外處理,需要的話你可以增加一個管道以在輸出結果之前對單詞進行排序等工作。
Socket編程
Unix的Socket也是基于流而設計的,Streem當然也支持Socket操作。圖6的程序是一個最簡單的使用了Socket的網絡Echo服務器(將接收到的數據原封不動地返回給客戶端)。
圖6 Echo服務器程序
# 在8007端口提供服務 tcp_server(8007) | { |s| s | s # 直接將輸入數據作為輸出返回給客戶端 }
代碼是不是非常簡單?如果程序的應用場景非常匹配流模型,那么采用Streem語言的話,編碼工作將會非常簡單。
我們還是來解析一下這段代碼吧。tcp_server會在參數指定的端口上打開一個服務器端Socket進行監聽,等待客戶端的連接。在Streem中,服務器端Socket是客戶端Socket的流對象。
客戶端Socket是客戶端的輸入和輸出的流,所以如下代碼
s | s
的實際功能就是“原封不動地將客戶端輸入直接返回給客戶端”。如果需要對輸入內容進行加工處理的話,只需要在管道之間加入一個進行數據處理的流就可以了。
管道業務
目前為止我們看到的管道的組成都是如下方式。
表達式1 | 表達式2 ... | 表達式n
表達式1是一個產生值的流(產生器,generator),表達式2及后續表達式都是對值進行變換、處理的流(過濾器,filter),管道最后的表達式n則可以認為是輸出目的地(消費者,consumer)。
產生器有很多種,比如像STDIN這樣的從外部獲取輸入的流,以及像seq()這樣的通過計算產生值序列的函數。如果將產生器替換為一個函數表達式的話,那么這個函數表達式就成為了一個通過return或emit來產生值的產生器。
過濾器在大多數情況下都是一個函數,通過參數接收前面的流傳過來的值,再通過emit或return將值傳遞給下一個流。
最后的消費者只會接收值,是一個不會emit值的流。
Streem程序的基本結構就是像這樣將流通過管道串聯起來,從產生器開始對數據進行流式處理。也許我們也可以稱之為“管道業務”。雖說這種計算 模型并不是萬能的,但是它具有抽象程度高、容易理解、支持并發編程等優點。有時候我們并不需要做到100%的功能,而是專注于那重要的80%就可以了。
但是,并不是說所有程序中的數據流都只有一種(即一條管線),因此完全放棄這樣的程序的做法也有點過頭。我們需要更加復雜的管線配置。具體來說,我們還需要將多個流合并(merge)為一個流,以及從一個流派生出多條通知(廣播)這兩種類型的結構。
更進一步來說,在將流進行連接的時候,如果有一個能指定緩沖區大小的方法的話,是不是更好呢?
合并管道
到這里為止我們看到的例子中數據流都只有一條管線,這在簡單的應用場景下倒沒什么問題,但是這種方式并不能解決現實中的所有問題。
有時候我們可能需要將多個管道合并為一個,或者對一條管道進行分割操作。管道的合并可以使用“&”操作符。
管道1 & 管道2
通過使用“&”操作符,就能將管道1和管道2的值合并成一個數組,并創建一個新的管道。合并后的新管道在任意一個原管道(這里為管道1和 管道2)終止的時候都會同時終止。比如本文前面的cat的例子,我們如果想像cat -n一樣同時輸出行號的話,可以使用圖9中的代碼。
圖7 cat -n的實現代碼
seq() & STDIN | STDOUT
由于“&”運算符的優先級高于“|”,所以下面的代碼
a & b | c
會被解釋為
(a & b) | c
當省略seq()的參數的時候,該函數會從1開始進行無限循環。由于STDIN是從標準輸入一行一行地讀取數據并寫入到管道中的,因此管道合并的結果如下所示。
[行號, 行內容]
將這個流合并后得到的新數組寫入到STDOUT(標準輸出),就實現了帶行號的cat。從實用角度來講,也許我們還需要對行號進行顯示位數的格式化等工作,不過這也只需要你在STDOUT之前加入一個用來格式化的管道操作就可以了 注1 。
注1 由于Streem還在開發之中,因此還沒有格式化相關的規范。
通道緩沖(channel buffering)
如果管道中最后一個流不是消費者的話,則會返回一個被稱為“通道”(channel)的對象。比如下面的代碼。
seq() & STDIN -> sequence
這里的sequence就是一個用來表現合并了seq()產生的數列和從STDIN讀取的輸入內容的通道。我們可以將管道理解為使用通道將進行流處理的task串聯起來的結構。
當然各個流中對數據進行處理的速度都有所不同。如果前面的流中數據產生速度太快的話,就會將數據堆積到通道中,進而導致占用大量內存。反過來說,如果通道中沒有任何緩存數據的話,則會增加前面處理的等待時間,從而降低整體效率。
所以,Streem會將適當數量(當然這個數量既不多也不少最理想了)的通道放到緩沖區中。但是真正合適的緩沖區大小則是由程序來決定的,我們不 能進行準確的預測。從性能的角度來講,有時需要根據實際情況來手動設置這個緩沖區的大小。這時候我們可以使用chan()這個非常方便的函數。
chan()函數用來顯式地創建通道對象。管道運算符“|”的右邊如果是通道對象的話,則該通道就會直接作為輸出目的地。另外你也可以為 chan()指定一個整數型的參數,來設置緩沖區的大小。也就是說,如果我們想在圖9的程序中將緩沖區大小顯式地設置為3的話,代碼就會變為圖10那樣。
圖8 指定了緩沖區大小的cat -n
seq() & STDIN | chan(3) | STDOUT
如果將緩沖區大小設置為0的話,那么在一個通道對象被創建之后,直到其被消費掉之前,流會進行等待,這樣管道就會以前后交互的方式來運行。這在單核CPU環境下也許會非常實用。
廣播
在聊天類的應用程序中,一個人發送的消息要被廣播給所有參與聊天的成員。通道也可以應用在這種場景下。如果將通過chan()創建的通道連接到多個流的話,那么作為輸入發送給該通道的值就會被廣播給所有和其連接的流。
如果我們將圖6中的Echo服務器修改為聊天服務器,將接收到的消息發送給所有參與者,則代碼如圖9所示。
圖9 Chat服務器
broadcast = chan() # 打開8008端口上的服務 tcp_server(8008) | { |s| broadcast | s # 返回參與者的消息 s | broadcast # 將消息發送給所有參與者 }
聰明的你也許已經發現了,廣播通道是具有狀態的。也就是說,連接到broadcast的流作為消息接收方,是會被保存到broadcast中的。 另外,作為輸出目標的流如果關閉了的話,或者通過disconnect方法被顯式地斷開連接的話,則該流就不再是輸出目標了。immutable是基本的 Streem,但是為了編寫容易理解的程序,有時候我們需要犧牲一點純粹性。當然,由于broadcast的狀態變化在Streem內部實現了互斥操作, 因此即使在并行環境下運行也不會有問題。
總結
我們圍繞管道計算模型設計了的新語言Streem。如果是非常適合流處理的程序的話,寫起來將簡單得讓人吃驚。
實際上Streem語言剛開始設計沒多久,在達到實用的程度之前,還有許多需要考慮的東西。比如如何進行異常處理、如何支持用戶自定義流、類似于對象的概念該如何定義等問題。隨著軟件規模變得越來越大,編程語言不得不考慮的問題也會越來越多。
“這種語言不能用來編寫大型軟件項目”,這是編程語言設計者經常使用的“借口”。但是,只要這種語言還不是一無是處,還沒有什么證據能表明這種借口會有什么實際作用。
下次我們將會對Streem的設計進行更深入的講解,同時也會涉及一些具體的實現細節。