入門goroutine并發設計模式以及goroutine可視化工具
Daisy-Chain
首先,為了防止過于枯燥,我先列出我最喜歡的一個模式:Daisy-Chain。這個模式比較復雜,對go的并發編程不太熟悉的同學,可以先看下面的模式。然后回過頭來看這個。
daisy chain會創建很多channel,然后把這些channel首尾相接級聯起來,組成一條單向鏈,每個channel都在處理不同的子任務,最后的結果在鏈的末端輸出。
func f(left, right chan int) {
// 這個函數就把right的輸出和left的輸入聯系起來了。
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
// 創建長度為n的daisy鏈
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
// 在鏈的最右端輸入1,那么最左端就會得到10001
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}</code></pre>
整個過程類似下圖:

那么這個模式有什么用呢?它可以用來處理迭代算法,使得部分迭代運算并發執行。只要迭代的每個階段都是相互獨立的即可。比如,計算質數:
import (
"fmt"
"os"
"runtime/trace"
"time"
)
func Generate(ch chan<- int) {
for i := 2; ; i++ {
ch <- i
// 這是為了方便gotrace繪圖
time.Sleep(10 * time.Millisecond)
}
}
func Filter(ch <-chan int, out chan<- int, prime int) {
for {
i := <-ch
if i%prime != 0 {
out <- i
} else {
fmt.Printf("[%d] filter out %d\n", prime, i)
}
}
}
func main() {
// 這些也是gotrace要求插入的代碼。下同
trace.Start(os.Stderr)
ch := make(chan int)
go Generate(ch)
for i := 0; i < 10; i++ {
prime := <-ch // step1
fmt.Println(prime)
out := make(chan int) // step2
go Filter(ch, out, prime) // step3
ch = out // step4
}
trace.Stop()
}</code></pre>
仔細分析上面的代碼,它的功能就是輸出前10個正整數質數。至于細節就讓我們一步步分析看看:
首先,Generate從2開始遍歷正整數,并且在一開始就被放入goroutine里了。結果會放在 ch 里;
然后,在main中啟動一個for循環,在循環的每個step1,都會從 ch 中讀出一個質數。2當然是質數,但是后面每一步從ch中讀取的都是質數嗎?且看下面的代碼。
然后,step2會創建一個新channel out (類似上例的right),ch和它作為輸入和輸出創建一個Filter的goroutine,專門過濾能被step1的prime整除的數。所以在 out 中輸出的都是不會被prime整除的數。
最后在關鍵的step4, out 變成下一個 ch 。相當于增加了一節chain的長度。而ch在每個循環中輸出的第一個數,都是被 之前 的所有 質數 無法整除的數,即下一個質數。
輸出日志如下:
2
3
[2] filter out 4
5
[2] filter out 6
7
[2] filter out 8
[3] filter out 9
[2] filter out 10
11
[2] filter out 12
13
[2] filter out 14
[3] filter out 15
[2] filter out 16
17
[2] filter out 18
19
[2] filter out 20
[3] filter out 21
[2] filter out 22
23
[2] filter out 24
[5] filter out 25
[2] filter out 26
[3] filter out 27
[2] filter out 28
29
為了更直觀的展示整個過程,我用divan大神的 gotrace 工具畫出了goroutine的3d交互圖:

其中每個紅色豎線表示一個goroutine,時間軸是從上到下的,所以紅線越長表示goroutine持續時間越長,也說明它生成的越早。
可以看到,最早的一個goroutine獲得的數字是3,4,…… 29,因為2已經被輸出了,所以是3到29,然后下一個goroutine獲得的就是5,7,9,…… 29,因為3被輸出,而偶數都被過濾了。以此類推,最后輸出的就是前10個質數。
需要指出的是,這個算法并不是最高效的,但卻是非常優雅的。
關于gotrace的安裝和使用,請移步 這里 。我是根據他的方法給go1.6.3打了補丁后,就能使用了。
好了,下面我們 換些基礎的模式 講一下:
Ping-pong
顧名思義,就是由2個goroutine相互踢皮球組成的模式。盡管它非常簡單,但是卻方便我們理解go的并發編程概念。
代碼如下:
用一個int表示ball(球),管道表示table(桌子),兩個goroutine就是2個運動員, 分別編號為1和2。
func main() {
var Ball int
table := make(chan int)
go player("2", table)
go player("1", table)
// 首先把球放到“桌上”
table <- Ball
time.Sleep(1 * time.Second)
// 1s后比賽結束……
<-table
}
func player(id string, table chan int) {
for {
ball := <-table
log.Printf("%s got ball[%d]\n", id, ball)
time.Sleep(50 * time.Millisecond)
log.Printf("%s bounceback ball[%d]\n", id, ball)
ball++
table <- ball
}
}</code></pre>
輸出如下:
1 got ball[0]
1 bounceback ball[0]
2 got ball[1]
2 bounceback ball[1]
1 got ball[2]
1 bounceback ball[2]
2 got ball[3]
2 bounceback ball[3]
1 got ball[4]
1 bounceback ball[4]
2 got ball[5]
2 bounceback ball[5]
代碼簡潔易懂,很好理解(看不懂的同學請不要拍我)。
下面,我們增加一位選手,讓3個運動員一塊打球
go player("2", table)
go player("3", table)
go player("1", table)
這下子熱鬧了,輸出如下:
1 got ball[0]
1 bounceback ball[0]
2 got ball[1]
2 bounceback ball[1]
3 got ball[2]
3 bounceback ball[2]
1 got ball[3]
1 bounceback ball[3]
2 got ball[4]
2 bounceback ball[4]
3 got ball[5]
3 bounceback ball[5]
1 got ball[6]
1 bounceback ball[6]
2 got ball[7]
2 bounceback ball[7]
3 got ball[8]
3 bounceback ball[8]
看3個人有條不紊的相互擊球。此時處女座一定非常滿意,但是對于習慣了并發隨機性的程序員來說,這實在有些過于美好:為什么它們的順序如此協調,為什么1總是給2,2給3,3給1,而不是其他順序呢?
劃重點了啊:
The answer is because Go runtime holds waiting FIFO queue for receivers, that is goroutines ready to receive on the particular channel
即,對于接收channel內容的goroutines來說,go的runtime會把它們分配到一個 FIFO隊列 中,所以這些goroutines只能按照既定的順序接收channel的內容,而不會弄亂。所以即使創建上百個palyers,順序依然是固定的。go實在是太貼心了,有不有!
Fan-In
也叫“扇入”,應該是并發編程里面比較普通的一個模式了。fan-in會從多個管道讀取輸入,并匯總到一個channel輸出,形象的比喻如下圖:

示例代碼如下
import (
"fmt"
"math/rand"
"os"
"runtime/trace"
"time"
)
func main() {
trace.Start(os.Stderr)
c := fanIn(boring(1), boring(2))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
trace.Stop()
}
func fanIn(input1, input2 <-chan int) <-chan int {
c := make(chan int)
go func() { for {c <- <-input1} }()
go func() { for {c <- <-input2} }()
return c
}
func boring(msg int) <-chan int {
c := make(chan int)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- msg1000 + i
time.Sleep(time.Duration(rand.Intn(1e3)) time.Millisecond)
}
}()
return c // Return the channel to the caller.
}</code></pre>
輸出為:
2000
2001
1001
2002
1002
2003
1003
2004
1004
gotrace輸出為(注意這是兩次獨立的運行結果):

可以看到,兩次的結果都匯入了main線程,并且順序輸出,沒有丟失數據,也沒有死鎖。
當然,簡單的情況,用select也可以。
select設計的目的就是在channel中間通訊,誰的數據先到達,哪個case分支先執行。
c1 := boring(1)
c2 := boring(2)
for i := 0; i < 10; i++ {
select {
case v := <-c1:
fmt.Println(v)
case v := <-c2:
fmt.Println(v)
}
}
Workers
也叫FanOut(扇出),和扇入模式相反,工作模式是一個管道分發任務,多個goroutines來執行。
示例代碼如下:
import (
"fmt"
"os"
"runtime/trace"
"sync"
"time"
)
func worker(ch <-chan int, wg sync.WaitGroup) {
defer wg.Done()
for {
task, ok := <-ch
if !ok {
return
}
time.Sleep(20 time.Millisecond)
fmt.Println("processing task", task)
}
}
func pool(wg *sync.WaitGroup, workers, tasks int) {
ch := make(chan int)
for i := 0; i < workers; i++ {
time.Sleep(1 * time.Millisecond)
// spawn出很多worker線程
go worker(ch, wg)
}
for i := 0; i < tasks; i++ {
time.Sleep(10 * time.Millisecond)
// 開始分發任務,被激活的workers開始工作了
ch <- i
}
close(ch)
}
func main() {
trace.Start(os.Stderr)
var wg sync.WaitGroup
wg.Add(36)
go pool(&wg, 36, 36)
wg.Wait()
trace.Stop()
}</code></pre>
代碼略長,但是邏輯其實非常清晰。我在注釋中也稍作了說明。
注意( 劃重點了 ), close(ch) 在這里很關鍵,它確定了每個worker退出的節點。當channel中的內容為空,同時它已經被close時, task, ok := <- ch 返回的ok==false,此時通知worker退出,wg標記完成,當所有的worker都完成時,wg.Wait()完成,轉入下一行執行。
在golang中,main不會自動等待所有子進程完成,如果沒有退出檢查,main進程會閃退,所有的子進程也會隨之強制退出,所以在main里必須有退出檢測機制,前幾個例子我們使用的是time.Sleep和for循環,這里我們使用了WaitGroup。
gotrace結果如下:

圓柱體中心就是main進程中生成的pool進程,圍繞它的是36個worker進程。藍色箭頭表示pool每隔10ms分發的任務,它們都被worker處理了。
Servers
server模式和fan_out類似,只不過它的worker線程是按需生成的,并且工作處理完畢后就釋放。所以這種模式常應用到網站服務器上。在主進程中,有一個for循環,Accept函數一直阻塞著循環的進行,一旦有新的請求過來,Accept就會生成一個connection,然后主進程就創建一個子進程處理這個connection以及其他邏輯。
示例代碼如下:
import (
"fmt"
"net"
"os"
"runtime/trace"
"time"
)
func handler(c net.Conn, ch chan int) {
ch <- len(c.RemoteAddr().String())
time.Sleep(10 * time.Microsecond)
c.Write([]byte("ok"))
c.Close()
}
func logger(ch chan int) {
for {
time.Sleep(1500 * time.Millisecond)
fmt.Println(<-ch)
}
}
func server(l net.Listener, ch chan int) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
func main() {
trace.Start(os.Stderr)
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan int)
go logger(ch)
go server(l, ch)
time.Sleep(10 * time.Second)
trace.Stop()
}</code></pre>
可以看到,主進程生成了一個tcp連接,啟動了server和logger兩個子進程。server用來監聽外網的請求,一旦請求過來,就會生成一個handler進程,用來處理connection。同時,handler還會通過管道和logger通訊,logger負責異步記錄相應日志。
這個程序運行時的輸入需要模擬外部請求來產生,為此我寫了一個腳本:
#!/bin/sh
i=0
while [[ $i -lt 20 ]];
do
# 通過nc發起tcp請求。每秒請求一次
echo "hello "$i | nc localhost 5000
sleep 1
((++i))
done</code></pre>
運行時,先啟動這個腳本,然后啟動server或gotrace。
gotrace的運行結果如下:

可以看到,盡管程序運行了10s,但是只處理了6個請求。這是因為logger占用了管道太長時間,使得handler的運行時間也延長到了1.5s以上。
為了解決這個問題,我們正好借助上面介紹的Worker模式,提高logger的并發性。
Server + Worker
import (
"fmt"
"net"
"os"
"runtime/trace"
"time"
)
func handler(c net.Conn, ch chan int) {
ch <- 0
time.Sleep(50 * time.Microsecond)
c.Write([]byte("ok"))
c.Close()
}
func logger(wch chan int) {
for {
fmt.Println(<-wch)
// 這里主要耗時
time.Sleep(1500 * time.Millisecond)
}
}
func pool(ch chan int, n int) {
wch := make(chan int)
for i := 0; i < n; i++ {
go logger(wch)
}
for {
wch <- <-ch
}
}
func server(l net.Listener, ch chan int) {
for {
c, err := l.Accept()
if err != nil {
continue
}
go handler(c, ch)
}
}
func main() {
trace.Start(os.Stderr)
l, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
ch := make(chan int)
go pool(ch, 36)
go server(l, ch)
time.Sleep(10 * time.Second)
trace.Stop()
}</code></pre>
其中pool函數跟上例類似,就是生成(spawn)很多worker,然后handle中生成的數據會先進入pool,由pool再分配給這些workers。
3D圖如下:

可以看到,此時server正好處理了10個請求。不再被logger拖延了。
Concurrency & Parallelism
注意我的題目是并發(concurrent)設計模式。那么并發和并行到底啥區別??
-
Concurrency: A condition that exists when at least two threads are making progress. A more generalized form of parallelism that can include time-slicing as a form of virtual parallelism.
Concurrency(并發性):是一種廣義的并行。在concurrence的語境下,兩個線程/任務可以表面上看起來“像是”并行,但其實機器只有一個核,它們只是分享了時間塊。當然,在多核的情況下,它可以是并行的。
-
Parallelism(并行性):A condition that arises when at least two threads are executing simultaneously.
這個就是狹義的并行,即線程、任務必須是同時進行的,否則不算parallelism。
來自:https://segmentfault.com/a/1190000007111208