Go并發編程基礎
原文出處: nada.kth.se 譯文出處: youngsterxyf(@夏永鋒_SJTU)
本文是一篇并發編程方面的入門文章,以Go語言編寫示例代碼,內容涵蓋:
- 運行期并發線程(goroutines)
- 基本的同步技術(管道和鎖)
- Go語言中基本的并發模式
- 死鎖和數據競爭
- 并行計算 </ul>
在開始閱讀本文之前,你應該知道如何編寫簡單的Go程序。如果你熟悉的是C/C++、Java或Python之類的語言,那么 Go語言之旅 能提供所有必要的背景知識。也許你還有興趣讀一讀 為C++程序員準備的Go語言教程 或 為Java程序員準備的Go語言教程。
Go允許使用go語句開啟一個新的運行期線程,即 goroutine,以一個不同的、新創建的goroutine來執行一個函數。同一個程序中的所有goroutine共享同一個地址空間。
Goroutine非常輕量,除了為之分配的棧空間,其所占用的內存空間微乎其微。并且其棧空間在開始時非常小,之后隨著堆存儲空間的按需分配或釋放而變化。內部實現上,goroutine會在多個操作系統線程上多路復用。如果一個goroutine阻塞了一個操作系統線程,例如:等待輸入,這個線程上的其他goroutine就會遷移到其他線程,這樣能繼續運行。開發者并不需要關心/擔心這些細節。
下面所示程序會輸出“Hello from main goroutine”。也可能會輸出“Hello from another goroutine”,具體依賴于兩個goroutine哪個先結束。
func main() { go fmt.Println("Hello from another goroutine") fmt.Println("Hello from main goroutine")// 至此,程序運行結束, // 所有活躍的goroutine被殺死
}</pre>
接下來的這個程序,多數情況下,會輸出“Hello from main goroutine”和“Hello from another goroutine”,輸出的順序不確定。但還有另一個可能性是:第二個goroutine運行得極其慢,在程序結束之前都沒來得及輸出相應的消息。
func main() { go fmt.Println("Hello from another goroutine") fmt.Println("Hello from main goroutine")time.Sleep(time.Second) // 等待1秒,等另一個goroutine結束
}</pre>
下面則是一個相對更加實際的示例,其中定義了一個函數使用并發來推遲觸發一個事件。
// 函數Publish在給定時間過期后打印text字符串到標準輸出 // 該函數并不會阻塞而是立即返回 func Publish(text string, delay time.Duration) { go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) }() // 注意這里的括號。必須調用匿名函數 }你可能會這樣使用Publish函數:
func main() { Publish("A goroutine starts a new thread of execution.", 5*time.Second) fmt.Println("Let’s hope the news will published before I leave.")// 等待發布新聞 time.Sleep(10 * time.Second) fmt.Println("Ten seconds later: I’m leaving now.")
}</pre>
這個程序,絕大多數情況下,會輸出以下三行,順序固定,每行輸出之間相隔5秒。
$ go run publish1.go Let’s hope the news will published before I leave. BREAKING NEWS: A goroutine starts a new thread of execution. Ten seconds later: I’m leaving now.一般來說,通過睡眠的方式來編排線程之間相互等待是不太可能的。下一章節會介紹Go語言中的一種同步機制 – 管道,并演示如何使用管道讓一個goroutine等待另一個goroutine。
2. 管道(channel)
![]()
管道是Go語言的一個構件,提供一種機制用于兩個goroutine之間通過傳遞一個指定類型的值來同步運行和通訊。操作符<-用于指定管道的方向,發送或接收。如果未指定方向,則為雙向管道。
chan Sushi // 可用來發送和接收Sushi類型的值 chan<- float64 // 僅可用來發送float64類型的值 <-chan int // 僅可用來接收int類型的值管道是引用類型,基于make函數來分配。
ic := make(chan int) // 不帶緩沖的int類型管道 wc := make(chan *Work, 10) // 帶緩沖的Work類型指針管道如果通過管道發送一個值,則將<-作為二元操作符使用。通過管道接收一個值,則將其作為一元操作符使用:
ic <- 3 // 往管道發送3 work := <-wc // 從管道接收一個指向Work類型值的指針如果管道不帶緩沖,發送方會阻塞直到接收方從管道中接收了值。如果管道帶緩沖,發送方則會阻塞直到發送的值被拷貝到緩沖區內;如果緩沖區已滿,則意味著需要等待直到某個接收方獲取到一個值。接收方在有值可以接收之前會一直阻塞。
關閉管道(Close)
close 函數標志著不會再往某個管道發送值。在調用close之后,并且在之前發送的值都被接收后,接收操作會返回一個零值,不會阻塞。一個多返回值的接收操作會額外返回一個布爾值用來指示返回的值是否發送操作傳遞的。
ch := make(chan string) go func() { ch <- "Hello!" close(ch) }() fmt.Println(<-ch) // 輸出字符串"Hello!" fmt.Println(<-ch) // 輸出零值 - 空字符串"",不會阻塞 fmt.Println(<-ch) // 再次打印輸出空字符串"" v, ok := <-ch // 變量v的值為空字符串"",變量ok的值為false一個帶有range子句的for語句會依次讀取發往管道的值,直到該管道關閉:
func main() { // 譯注:要想運行該示例,需要先定義類型Sushi,如type Sushi string var ch <-chan Sushi = Producer() for s := range ch { fmt.Println("Consumed", s) } } func Producer() <-chan Sushi { ch := make(chan Sushi) go func(){ ch <- Sushi("海老握り") // Ebi nigiri ch <- Sushi("鮪とろ握り") // Toro nigiri close(ch) }() return ch }3. 同步
下一個示例中,我們讓Publish函數返回一個管道 – 用于在發布text變量值時廣播一條消息:
// 在給定時間過期時,Publish函數會打印text變量值到標準輸出 // 在text變量值發布后,該函數會關閉管道wait func Publish(text string, delay time.Duration) (wait <-chan struct{}) { ch := make(chan struct{}) go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) close(ch) // 廣播 - 一個關閉的管道都會發送一個零值 }() return ch }注意:我們使用了一個空結構體的管道:struct{}。這明確地指明該管道僅用于發信號,而不是傳遞數據。
我們可能會這樣使用這個函數:
func main() { wait := Publish("Channels let goroutines communicate.", 5*time.Second) fmt.Println("Waiting for the news...") <-wait fmt.Println("The news is out, time to leave.") }這個程序會按指定的順序輸出以下三行內容。最后一行在新聞(news)一出就會立即輸出。
$ go run publish2.go Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. The news is out, time to leave.4. 死鎖
![]()
現在我們在Publish函數中引入一個bug:
func Publish(text string, delay time.Duration) (wait <-chan struct{}) { ch := make(chan struct{}) go func() { time.Sleep(delay) fmt.Println("BREAKING NEWS:", text) // 譯注:注意這里將close函數調用注釋掉了 //close(ch) }() return ch }主程序還是像之前一樣開始運行:輸出第一行,然后等待5秒,這時Publish函數開啟的goroutine會輸出突發新聞(breaking news),然后退出,留下主goroutine獨自等待。
func main() { wait := Publish("Channels let goroutines communicate.", 5*time.Second) fmt.Println("Waiting for the news...") // 譯注:注意下面這一句 <-wait fmt.Println("The news is out, time to leave.") }此刻之后,程序無法再繼續往下執行。眾所周知,這種情形即為死鎖。
死鎖是線程之間相互等待,其中任何一個都無法向前運行的情形。
Go語言對于運行時的死鎖檢測具備良好的支持。當沒有任何goroutine能夠往前執行的情形發生時,Go程序通常會提供詳細的錯誤信息。以下就是我們的問題程序的輸出:
Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() .../goroutineStop.go:11 +0xf6 goroutine 2 [syscall]: created by runtime.main .../go/src/pkg/runtime/proc.c:225 goroutine 4 [timer goroutine (idle)]: created by addtimer .../go/src/pkg/runtime/ztime_linux_amd64.c:73大多數情況下找出Go程序中造成死鎖的原因都比較容易,那么剩下的就是如何解決這個bug了。
5. 數據競爭(data race)
死鎖也許聽起來令人挺憂傷的,但伴隨并發編程真正災難性的錯誤其實是數據競爭,相當常見,也可能非常難于調試。
當兩個線程并發地訪問同一個變量,并且其中至少一個訪問是寫操作時,數據競爭就發生了。
下面的這個函數就有數據競爭問題,其行為是未定義的。例如,可能輸出數值1。代碼之后是一個可能性解釋,試圖搞清楚這一切是如何發生得。
func race() { wait := make(chan struct{}) n := 0 go func() { // 譯注:注意下面這一行 n++ // 一次訪問: 讀, 遞增, 寫 close(wait) }() // 譯注:注意下面這一行 n++ // 另一次沖突的訪問 <-wait fmt.Println(n) // 輸出:未指定 }代碼中的兩個goroutine(假設命名為g1和g2)參與了一次競爭,我們無法獲知操作會以何種順序發生。以下是諸多可能中的一種:
- g1從n中獲取值0
- g2從n中獲取值0
- g1將值從0增大到1
- g1將1寫到n
- g2將值從0增大到1
- g2將1寫到n
- 程序輸出 n 的值,當前為1
“數據競爭(data race)”這名字有點誤導的嫌疑。不僅操作的順序是未定義的,其實根本沒有任何保證(no guarantees whatsoever)。編譯器和硬件為了得到更好的性能,經常都會對代碼進行上下內外的順序變換。如果你看到一個線程處于中間行為狀態時,那么當時的場景可能就像下圖所示的一樣:
避免數據競爭的唯一方式是線程間同步訪問所有的共享可變數據。有幾種方式能夠實現這一目標。Go語言中,通常是使用管道或者鎖。(sync和sync/atomic包中還有更低層次的機制可供使用,但本文中不做討論)。
Go語言中,處理并發數據訪問的推薦方式是使用管道從一個goroutine中往下一個goroutine傳遞實際的數據。有格言說得好:“不要通過共享內存來通訊,而是通過通訊來共享內存”。
func sharingIsCaring() { ch := make(chan int) go func() { n := 0 // 僅為一個goroutine可見的局部變量. n++ ch <- n // 數據從一個goroutine離開... }() n := <-ch // ...然后安全到達另一個goroutine. n++ fmt.Println(n) // 輸出: 2 }
以上代碼中的管道肩負雙重責任 – 從一個goroutine將數據傳遞到另一個goroutine,并且起到同步的作用:發送方goroutine會等待另一個goroutine接收數據,接收方goroutine也會等待另一個goroutine發送數據。
Go語言內存模型 - 要保證一個goroutine中對一個變量的讀操作得到的值正好是另一個goroutine中對同一個變量寫操作產生的值,條件相當復雜,但goroutine之間只要通過管道來共享所有可變數據,那么就能遠離數據競爭了。
6. 互斥鎖
有時,通過顯式加鎖,而不是使用管道,來同步數據訪問,可能更加便捷。Go語言標準庫為這一目的提供了一個互斥鎖 - sync.Mutex。
要想這類加鎖起效的話,關鍵之處在于:所有對共享數據的訪問,不管讀寫,僅當goroutine持有鎖才能操作。一個goroutine出錯就足以破壞掉一個程序,引入數據競爭。
因此,應該設計一個自定義數據結構,具備明確的API,確保所有的同步都在數據結構內部完成。下例中,我們構建了一個安全、易于使用的并發數據結構,AtomicInt,用于存儲一個整型值。任意數量的goroutine都能通過Add和Value方法安全地訪問這個數值。
// AtomicInt是一個并發數據結構,持有一個整數值 // 該數據結構的零值為0 type AtomicInt struct { mu sync.Mutex // 鎖,一次僅能被一個goroutine持有。 n int } // Add方法作為一個原子操作將n加到AtomicInt func (a *AtomicInt) Add(n int) { a.mu.Lock() // 等待鎖釋放,然后持有它 a.n += n a.mu.Unlock() // 釋放鎖 } // Value方法返回a的值 func (a *AtomicInt) Value() int { a.mu.Lock() n := a.n a.mu.Unlock() return n } func lockItUp() { wait := make(chan struct{}) var n AtomicInt go func() { n.Add(1) // 一個訪問 close(wait) }() n.Add(1) // 另一個并發訪問 <-wait fmt.Println(n.Value()) // 輸出: 2 }
7. 檢測數據競爭
競爭有時非常難于檢測。下例中的這個函數有一個數據競爭問題,執行這個程序時會輸出55555。嘗試一下,也許你會得到一個不同的結果。(sync.WaitGroup是Go語言標準庫的一部分;用于等待一組goroutine結束運行。)
func race() { var wg sync.WaitGroup wg.Add(5) // 譯注:注意下面這行代碼中的i++ for i := 0; i < 5; i++ { go func() { // 注意下一行代碼會輸出什么?為什么? fmt.Print(i) // 6個goroutine共享變量i wg.Done() }() } wg.Wait() // 等待所有(5個)goroutine運行結束 fmt.Println() }
對于輸出55555,一個貌似合理的解釋是:執行i++的goroutine在其他goroutine執行打印語句之前就完成了5次i++操作。實際上變量i更新后的值為其他goroutine所見純屬巧合。
一個簡單的解決方案是:使用一個局部變量,然后當開啟新的goroutine時,將數值作為參數傳遞:
func correct() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { go func(n int) { // 使用局部變量 fmt.Print(n) wg.Done() }(i) } wg.Wait() fmt.Println() }
這次代碼就對了,程序會輸出期望的結果,如:24031。注意:goroutine之間的運行順序是不確定的。
仍舊使用閉包,但能夠避免數據競爭也是可能的,必須小心翼翼地讓每個goroutine使用一個獨有的變量。
func alsoCorrect() { var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { n := i // 為每個閉包創建一個獨有的變量 go func() { fmt.Print(n) wg.Done() }() } wg.Wait() fmt.Println() }
數據競爭自動檢測
一般來說,不太可能能夠自動檢測發現所有可能的數據競爭情況,但Go(從版本1.1開始)有一個強大的數據競爭檢測器。
這個工具用起來也很簡單:只要在使用go命令時加上-race標記即可。開啟檢測器運行上面的程序會給出清晰且信息量大的輸出:
$ go run -race raceClosure.go Race: ================== WARNING: DATA RACE Read by goroutine 2: main.func·001() ../raceClosure.go:22 +0x65 Previous write by goroutine 0: main.race() ../raceClosure.go:20 +0x19b main.main() ../raceClosure.go:10 +0x29 runtime.main() ../go/src/pkg/runtime/proc.c:248 +0x91 Goroutine 2 (running) created at: main.race() ../raceClosure.go:24 +0x18b main.main() ../raceClosure.go:10 +0x29 runtime.main() ../go/src/pkg/runtime/proc.c:248 +0x91 ================== 55555 Correct: 01234 Also correct: 01324 Found 1 data race(s) exit status 66
該工具發現一處數據競爭,包含:一個goroutine在第20行對一個變量進行寫操作,跟著另一個goroutine在第22行對同一個變量進行了未同步的讀操作。
注意:競爭檢測器只能發現在運行期確實發生的數據競爭(譯注:我也不太理解這話,請指導)
8. Select語句
select語句是Go語言并發工具集中的終極工具。select用于從一組可能的通訊中選擇一個進一步處理。如果任意一個通訊都可以進一步處理,則從中隨機選擇一個,執行對應的語句。否則,如果又沒有默認分支(default case),select語句則會阻塞,直到其中一個通訊完成。
以下是一個玩具示例,演示select語句如何用于實現一個隨機數生成器:
// RandomBits函數 返回一個管道,用于產生一個比特隨機序列 func RandomBits() <-chan int { ch := make(chan int) go func() { for { select { case ch <- 0: // 注意:分支沒有對應的處理語句 case ch <- 1: } } }() return ch }
下面是相對更加實際一點的例子:如何使用select語句為一個操作設置一個時間限制。代碼會輸出變量news的值或者超時消息,具體依賴于兩個接收語句哪個先執行:
select { case news := <-NewsAgency: fmt.Println(news) case <-time.After(time.Minute): fmt.Println("Time out: no news in one minute.") }
函數 time.After 是Go語言標準庫的一部分;它會在等待指定時間后將當前的時間發送到返回的管道中。
9. 綜合所有示例
花點時間認真研究一下這個示例。如果你完全理解,也就對Go語言中并發的應用方式有了全面的掌握。
這個程序演示了如何將管道用于被任意數量的goroutine發送和接收數據,也演示了如何將select語句用于從多個通訊中選擇一個。
func main() { people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"} match := make(chan string, 1) // 為一個未匹配的發送操作提供空間 wg := new(sync.WaitGroup) wg.Add(len(people)) for _, name := range people { go Seek(name, match, wg) } wg.Wait() select { case name := <-match: fmt.Printf("No one received %s’s message.\n", name) default: // 沒有待處理的發送操作 } } // 函數Seek 發送一個name到match管道或從match管道接收一個peer,結束時通知wait group func Seek(name string, match chan string, wg *sync.WaitGroup) { select { case peer := <-match: fmt.Printf("%s sent a message to %s.\n", peer, name) case match <- name: // 等待某個goroutine接收我的消息 } wg.Done() }
示例輸出:
$ go run matching.go Cody sent a message to Bob. Anna sent a message to Eva. No one received Dave’s message.
10. 并行計算
并發的一個應用是將一個大的計算切分成一些工作單元,調度到不同的CPU上同時地計算。
將計算分布到多個CPU上更多是一門藝術,而不是一門科學。以下是一些經驗法則:
- 每個工作單元應該花費大約100微秒到1毫秒的時間用于計算。如果單元粒度太小,切分問題以及調度子問題的管理開銷可能就會太大。如果單元粒度太大,整個計算也許不得不等待一個慢的工作項結束。這種緩慢可能因為多種原因而產生,比如:調度、其他進程的中斷或者糟糕的內存布局。(注意:工作單元的數目是不依賴于CPU的數目的)
- 盡可能減小共享的數據量。并發寫操作的代價非常大,特別是如果goroutine運行在不同的CPU上。讀操作之間的數據共享則通常不會是個問題。
- 數據訪問盡量利用良好的局部性。如果數據能保持在緩存中,數據加載和存儲將會快得多得多,這對于寫操作也格外地重要。
下面的這個示例展示如何切分一個開銷很大的計算并將其分布在所有可用的CPU上進行計算。先看一下有待優化的代碼:
type Vector []float64 // 函數Convolve 計算 w = u * v,其中 w[k] = Σ u[i]*v[j], i + j = k // 先決條件:len(u) > 0, len(v) > 0 func Convolve(u, v Vector) (w Vector) { n := len(u) + len(v) - 1 w = make(Vector, n) for k := 0; k < n; k++ { w[k] = mul(u, v, k) } return } // 函數mul 返回 Σ u[i]*v[j], i + j = k. func mul(u, v Vector, k int) (res float64) { n := min(k+1, len(u)) j := min(k, len(v)-1) for i := k - j; i < n; i, j = i+1, j-1 { res += u[i] * v[j] } return }
思路很簡單:確定合適大小的工作單元,然后在不同的goroutine中執行每個工作單元。以下是并發版本的Convolve:
func Convolve(u, v Vector) (w Vector) { n := len(u) + len(v) - 1 w = make(Vector, n) // 將 w 切分成花費 ~100μs-1ms 用于計算的工作單元 size := max(1, 1<<20/n) wg := new(sync.WaitGroup) wg.Add(1 + (n-1)/size) for i := 0; i < n && i >= 0; i += size { // 整型溢出后 i < 0 j := i + size if j > n || j < 0 { // 整型溢出后 j < 0 j = n } // 這些goroutine共享內存,但是只讀 go func(i, j int) { for k := i; k < j; k++ { w[k] = mul(u, v, k) } wg.Done() }(i, j) } wg.Wait() return }
工作單元定義之后,通常情況下最好將調度工作交給運行時和操作系統。然而,對于Go 1.* 你也許需要告訴運行時希望多少個goroutine來同時地運行代碼。
func init() { numcpu := runtime.NumCPU() runtime.GOMAXPROCS(numcpu) // 嘗試使用所有可用的CPU }