入門goroutine并發設計模式以及goroutine可視化工具

zmoh7166 9年前發布 | 13K 次閱讀 Goroutine 并發 Google Go/Golang開發

Daisy-Chain

首先,為了防止過于枯燥,我先列出我最喜歡的一個模式:Daisy-Chain。這個模式比較復雜,對go的并發編程不太熟悉的同學,可以先看下面的模式。然后回過頭來看這個。

daisy chain會創建很多channel,然后把這些channel首尾相接級聯起來,組成一條單向鏈,每個channel都在處理不同的子任務,最后的結果在鏈的末端輸出。

func f(left, right chan int) {
    // 這個函數就把right的輸出和left的輸入聯系起來了。
    left <- 1 + <-right
}

func main() { const n = 10000 leftmost := make(chan int) right := leftmost left := leftmost // 創建長度為n的daisy鏈 for i := 0; i < n; i++ { right = make(chan int) go f(left, right) left = right } // 在鏈的最右端輸入1,那么最左端就會得到10001 go func(c chan int) { c <- 1 }(right) fmt.Println(<-leftmost) }</code></pre>

整個過程類似下圖:

那么這個模式有什么用呢?它可以用來處理迭代算法,使得部分迭代運算并發執行。只要迭代的每個階段都是相互獨立的即可。比如,計算質數:

import (
       "fmt"
       "os"
       "runtime/trace"
       "time"
)
func Generate(ch chan<- int) {
       for i := 2; ; i++ {
           ch <- i
           // 這是為了方便gotrace繪圖
           time.Sleep(10 * time.Millisecond)
       }
}

func Filter(ch <-chan int, out chan<- int, prime int) { for { i := <-ch if i%prime != 0 { out <- i } else { fmt.Printf("[%d] filter out %d\n", prime, i) } } }

func main() { // 這些也是gotrace要求插入的代碼。下同 trace.Start(os.Stderr) ch := make(chan int) go Generate(ch) for i := 0; i < 10; i++ { prime := <-ch // step1 fmt.Println(prime) out := make(chan int) // step2 go Filter(ch, out, prime) // step3 ch = out // step4 } trace.Stop() }</code></pre>

仔細分析上面的代碼,它的功能就是輸出前10個正整數質數。至于細節就讓我們一步步分析看看:

首先,Generate從2開始遍歷正整數,并且在一開始就被放入goroutine里了。結果會放在 ch 里;

然后,在main中啟動一個for循環,在循環的每個step1,都會從 ch 中讀出一個質數。2當然是質數,但是后面每一步從ch中讀取的都是質數嗎?且看下面的代碼。

然后,step2會創建一個新channel out (類似上例的right),ch和它作為輸入和輸出創建一個Filter的goroutine,專門過濾能被step1的prime整除的數。所以在 out 中輸出的都是不會被prime整除的數。

最后在關鍵的step4, out 變成下一個 ch 。相當于增加了一節chain的長度。而ch在每個循環中輸出的第一個數,都是被 之前 的所有 質數 無法整除的數,即下一個質數。

輸出日志如下:

2
3
[2] filter out 4
5
[2] filter out 6
7
[2] filter out 8
[3] filter out 9
[2] filter out 10
11
[2] filter out 12
13
[2] filter out 14
[3] filter out 15
[2] filter out 16
17
[2] filter out 18
19
[2] filter out 20
[3] filter out 21
[2] filter out 22
23
[2] filter out 24
[5] filter out 25
[2] filter out 26
[3] filter out 27
[2] filter out 28
29

為了更直觀的展示整個過程,我用divan大神的 gotrace 工具畫出了goroutine的3d交互圖:

其中每個紅色豎線表示一個goroutine,時間軸是從上到下的,所以紅線越長表示goroutine持續時間越長,也說明它生成的越早。

可以看到,最早的一個goroutine獲得的數字是3,4,…… 29,因為2已經被輸出了,所以是3到29,然后下一個goroutine獲得的就是5,7,9,…… 29,因為3被輸出,而偶數都被過濾了。以此類推,最后輸出的就是前10個質數。

需要指出的是,這個算法并不是最高效的,但卻是非常優雅的。

關于gotrace的安裝和使用,請移步 這里 。我是根據他的方法給go1.6.3打了補丁后,就能使用了。

好了,下面我們 換些基礎的模式 講一下:

Ping-pong

顧名思義,就是由2個goroutine相互踢皮球組成的模式。盡管它非常簡單,但是卻方便我們理解go的并發編程概念。

代碼如下:

用一個int表示ball(球),管道表示table(桌子),兩個goroutine就是2個運動員, 分別編號為1和2。

func main() {
    var Ball int
    table := make(chan int)
    go player("2", table)
    go player("1", table)

// 首先把球放到“桌上”
table <- Ball
time.Sleep(1 * time.Second)
// 1s后比賽結束……
<-table

}

func player(id string, table chan int) { for { ball := <-table log.Printf("%s got ball[%d]\n", id, ball) time.Sleep(50 * time.Millisecond) log.Printf("%s bounceback ball[%d]\n", id, ball) ball++ table <- ball } }</code></pre>

輸出如下:

1 got ball[0]
1 bounceback ball[0]
2 got ball[1]
2 bounceback ball[1]
1 got ball[2]
1 bounceback ball[2]
2 got ball[3]
2 bounceback ball[3]
1 got ball[4]
1 bounceback ball[4]
2 got ball[5]
2 bounceback ball[5]

代碼簡潔易懂,很好理解(看不懂的同學請不要拍我)。

下面,我們增加一位選手,讓3個運動員一塊打球

go player("2", table)
    go player("3", table)
    go player("1", table)

這下子熱鬧了,輸出如下:

1 got ball[0]
1 bounceback ball[0]
2 got ball[1]
2 bounceback ball[1]
3 got ball[2]
3 bounceback ball[2]
1 got ball[3]
1 bounceback ball[3]
2 got ball[4]
2 bounceback ball[4]
3 got ball[5]
3 bounceback ball[5]
1 got ball[6]
1 bounceback ball[6]
2 got ball[7]
2 bounceback ball[7]
3 got ball[8]
3 bounceback ball[8]

看3個人有條不紊的相互擊球。此時處女座一定非常滿意,但是對于習慣了并發隨機性的程序員來說,這實在有些過于美好:為什么它們的順序如此協調,為什么1總是給2,2給3,3給1,而不是其他順序呢?

劃重點了啊:

The answer is because Go runtime holds waiting FIFO queue for receivers, that is goroutines ready to receive on the particular channel

即,對于接收channel內容的goroutines來說,go的runtime會把它們分配到一個 FIFO隊列 中,所以這些goroutines只能按照既定的順序接收channel的內容,而不會弄亂。所以即使創建上百個palyers,順序依然是固定的。go實在是太貼心了,有不有!

Fan-In

也叫“扇入”,應該是并發編程里面比較普通的一個模式了。fan-in會從多個管道讀取輸入,并匯總到一個channel輸出,形象的比喻如下圖:

示例代碼如下

import (
       "fmt"
       "math/rand"
       "os"
       "runtime/trace"
       "time"
)

func main() { trace.Start(os.Stderr) c := fanIn(boring(1), boring(2)) for i := 0; i < 10; i++ { fmt.Println(<-c) } fmt.Println("You're both boring; I'm leaving.") trace.Stop() }

func fanIn(input1, input2 <-chan int) <-chan int { c := make(chan int) go func() { for {c <- <-input1} }() go func() { for {c <- <-input2} }() return c }

func boring(msg int) <-chan int { c := make(chan int) go func() { // We launch the goroutine from inside the function. for i := 0; ; i++ { c <- msg1000 + i time.Sleep(time.Duration(rand.Intn(1e3)) time.Millisecond) } }() return c // Return the channel to the caller. }</code></pre>

輸出為:

2000

2001

1001

2002

1002

2003

1003

2004

1004

gotrace輸出為(注意這是兩次獨立的運行結果):

可以看到,兩次的結果都匯入了main線程,并且順序輸出,沒有丟失數據,也沒有死鎖。

當然,簡單的情況,用select也可以。

select設計的目的就是在channel中間通訊,誰的數據先到達,哪個case分支先執行。

c1 := boring(1)
c2 := boring(2)
for i := 0; i < 10; i++ {
    select {
    case v := <-c1:
        fmt.Println(v)
    case v := <-c2:
        fmt.Println(v)
    }
}

Workers

也叫FanOut(扇出),和扇入模式相反,工作模式是一個管道分發任務,多個goroutines來執行。

示例代碼如下:

import (
    "fmt"
    "os"
    "runtime/trace"
    "sync"
    "time"
)

func worker(ch <-chan int, wg sync.WaitGroup) { defer wg.Done() for { task, ok := <-ch if !ok { return } time.Sleep(20 time.Millisecond) fmt.Println("processing task", task) } }

func pool(wg *sync.WaitGroup, workers, tasks int) { ch := make(chan int)

for i := 0; i < workers; i++ {
    time.Sleep(1 * time.Millisecond)
    // spawn出很多worker線程
    go worker(ch, wg)
}

for i := 0; i < tasks; i++ {
    time.Sleep(10 * time.Millisecond)
    // 開始分發任務,被激活的workers開始工作了
    ch <- i
}

close(ch)

} func main() { trace.Start(os.Stderr) var wg sync.WaitGroup wg.Add(36) go pool(&wg, 36, 36) wg.Wait() trace.Stop() }</code></pre>

代碼略長,但是邏輯其實非常清晰。我在注釋中也稍作了說明。

注意( 劃重點了 ), close(ch) 在這里很關鍵,它確定了每個worker退出的節點。當channel中的內容為空,同時它已經被close時, task, ok := <- ch 返回的ok==false,此時通知worker退出,wg標記完成,當所有的worker都完成時,wg.Wait()完成,轉入下一行執行。

在golang中,main不會自動等待所有子進程完成,如果沒有退出檢查,main進程會閃退,所有的子進程也會隨之強制退出,所以在main里必須有退出檢測機制,前幾個例子我們使用的是time.Sleep和for循環,這里我們使用了WaitGroup。

gotrace結果如下:

圓柱體中心就是main進程中生成的pool進程,圍繞它的是36個worker進程。藍色箭頭表示pool每隔10ms分發的任務,它們都被worker處理了。

Servers

server模式和fan_out類似,只不過它的worker線程是按需生成的,并且工作處理完畢后就釋放。所以這種模式常應用到網站服務器上。在主進程中,有一個for循環,Accept函數一直阻塞著循環的進行,一旦有新的請求過來,Accept就會生成一個connection,然后主進程就創建一個子進程處理這個connection以及其他邏輯。

示例代碼如下:

import (
       "fmt"
       "net"
       "os"
       "runtime/trace"
       "time"
)

func handler(c net.Conn, ch chan int) { ch <- len(c.RemoteAddr().String())

time.Sleep(10 * time.Microsecond) c.Write([]byte("ok")) c.Close() }

func logger(ch chan int) { for { time.Sleep(1500 * time.Millisecond) fmt.Println(<-ch) } }

func server(l net.Listener, ch chan int) { for { c, err := l.Accept() if err != nil { continue } go handler(c, ch) } }

func main() { trace.Start(os.Stderr)

   l, err := net.Listen("tcp", ":5000")
   if err != nil {
       panic(err)
   }
   ch := make(chan int)
   go logger(ch)
   go server(l, ch)
   time.Sleep(10 * time.Second)
   trace.Stop()

}</code></pre>

可以看到,主進程生成了一個tcp連接,啟動了server和logger兩個子進程。server用來監聽外網的請求,一旦請求過來,就會生成一個handler進程,用來處理connection。同時,handler還會通過管道和logger通訊,logger負責異步記錄相應日志。

這個程序運行時的輸入需要模擬外部請求來產生,為此我寫了一個腳本:

#!/bin/sh
i=0
while [[ $i -lt 20 ]];
do

# 通過nc發起tcp請求。每秒請求一次
echo "hello "$i | nc localhost 5000
sleep 1
((++i))

done</code></pre>

運行時,先啟動這個腳本,然后啟動server或gotrace。

gotrace的運行結果如下:

可以看到,盡管程序運行了10s,但是只處理了6個請求。這是因為logger占用了管道太長時間,使得handler的運行時間也延長到了1.5s以上。

為了解決這個問題,我們正好借助上面介紹的Worker模式,提高logger的并發性。

Server + Worker

import (
           "fmt"
           "net"
           "os"
           "runtime/trace"
           "time"
)
func handler(c net.Conn, ch chan int) {
       ch <- 0
       time.Sleep(50 * time.Microsecond)
       c.Write([]byte("ok"))
       c.Close()
}

func logger(wch chan int) { for { fmt.Println(<-wch) // 這里主要耗時 time.Sleep(1500 * time.Millisecond) } }

func pool(ch chan int, n int) { wch := make(chan int) for i := 0; i < n; i++ { go logger(wch) } for { wch <- <-ch } }

func server(l net.Listener, ch chan int) { for { c, err := l.Accept() if err != nil { continue } go handler(c, ch) } }

func main() { trace.Start(os.Stderr)

   l, err := net.Listen("tcp", ":5000")
   if err != nil {
       panic(err)
   }
   ch := make(chan int)
   go pool(ch, 36)
   go server(l, ch)
   time.Sleep(10 * time.Second)
   trace.Stop()

}</code></pre>

其中pool函數跟上例類似,就是生成(spawn)很多worker,然后handle中生成的數據會先進入pool,由pool再分配給這些workers。

3D圖如下:

可以看到,此時server正好處理了10個請求。不再被logger拖延了。

Concurrency & Parallelism

注意我的題目是并發(concurrent)設計模式。那么并發和并行到底啥區別??

  • Concurrency: A condition that exists when at least two threads are making progress. A more generalized form of parallelism that can include time-slicing as a form of virtual parallelism.

    Concurrency(并發性):是一種廣義的并行。在concurrence的語境下,兩個線程/任務可以表面上看起來“像是”并行,但其實機器只有一個核,它們只是分享了時間塊。當然,在多核的情況下,它可以是并行的。

  • Parallelism(并行性):A condition that arises when at least two threads are executing simultaneously.

    這個就是狹義的并行,即線程、任務必須是同時進行的,否則不算parallelism。

 

來自:https://segmentfault.com/a/1190000007111208

 

 本文由用戶 zmoh7166 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!