設計Go API的管道使用原則

jopen 10年前發布 | 12K 次閱讀 Go語言

管道是并發安全的隊列,用于在Go的輕量級線程(Go協程)之間安全地傳遞消息。總的來講,這些原語是Go語言中最為稱道的特色功能之一。這種消息傳遞范式使得開發者可以以易于理解的語義和控制流來協調管理多線程并發任務,而這勝過使用回調函數或者共享內存。

即使管道如此強大,在公有的API中卻不常見。例如,我梳理過Go的標準庫,在145個包中有超過6000個公有的API。在這上千個API中,去重后,只有5個用到了管道。

在公有的API中使用管道時,如何折衷考慮和取舍,缺乏指導。“共有API”,我是指“任何實現者和使用者是不同的兩個人的編程接口”。這篇文章會深入講解,為如何在共有API中使用管道,提供一系列的原則和解釋。一些特例會在本章末尾討論。

原則 #1

API應該聲明管道的方向性。

例子

time.After

func After(d Duration) <-chan Time

signal.Notify

func Notify(c chan<- os.Signal, sig ...os.Signal)

盡管并不常用,Go允許指定一個管道的方向性。語言規范這么寫:

可選的<-操作符指定了管道的方向,發送或接收。如果沒有指定方向,那么管道就是雙向的。

關鍵在于API簽名中的方向操作符會被編譯器強制檢查

t := time.After(time.Second)
t <- time.Now()  // 會編譯失敗(send to receive-only type <-chan Time)

除了能夠被編譯器強制檢查安全性,方向操作符還能幫助API使用者理解數據的流動方向——只需要看一下類型簽名即可。

原則 #2

向一個管道發送無界數據流的API必須寫文檔解釋清楚在消費者消費不及時時API的行為。

例子

time.NewTicker

// NewTicker returns a new Ticker containing a channel that will send the
// time with a period specified by the duration argument.
// It adjusts the intervals or drops ticks to make up for slow receivers.
// ...
func NewTicker(d Duration) *Ticker {
    ...
}

signal.Notify

// Notify causes package signal to relay incoming signals to c.
// ...
// Package signal will not block sending to c
// ...

func Notify(c chan<- os.Signal, sig ...os.Signal) {

ssh.Conn.OpenChannel

// OpenChannel tries to open an channel.
// ...
// On success it returns the SSH Channel and a Go channel for
// incoming, out-of-band requests. The Go channel must be serviced, or
// the connection will hang.

OpenChannel(name string, data []byte) (Channel, <-chan *Request, error)

當一個API向一個管道發送無界數據流時,在實現API時面臨的問題是如果向管道發送數據會阻塞怎么辦。阻塞的原因可能是管道已經滿了或者管道是無緩沖的,沒有go協程準備好接收數據。針對不同的場景要選擇合適的行為,但是每個場景必須作出選擇。例如,ssh包選擇了阻塞,并且文檔寫明如果你不接受數據,連接就會被卡住。signal.Notify 和 time.Tick選擇不阻塞,直接丟棄數據。

不足的是,Go本身并沒有從類型或函數簽名角度提供方法指定默認行為。作為API的設計者,你必須在文檔中寫明行為,不然其行為就是不定的。然而,多數情況下我們都是API的使用者而不是設計者,所以我們可以反過來記這個原則,反過來就是一條警告信息:

對于通過一個管道向一個慢速的消費者發送無界數據的API,在沒有通讀API的文檔或者實現源碼之前,你不能確定API的行為。

原則 #3

向一個管道發送有界數據,同時這個管道是作為參數傳遞進來的API,必須用文檔寫明對于慢速消費者的行為。

不好的例子

rpc.Client.Go

func (client *Client) Go(serviceMethod string,
                         args interface{},
                         reply interface{},
                         done chan *Call
                         ) *Call

這個原則和第二個原則類似,不同點在于這個原則用于發送有界數據的API。不幸的是,在標準庫中沒有很好的例子。標準庫中唯一的API就是rpc.Client.Go,但它違背了我們的原則。文檔上這么寫:

Go異步的調用這個函數。它會返回代表著調用的Call數據結構。在調用完成時,done管道會通過返回同一個Call對象來觸發。如果done是空的,Go會分配一個新的管道;如果不空,done必須是有緩沖的,不然Go就會崩潰。

Go發送了有界數據(只有1,當遠程調用結束時)。但是注意到,由于管道是被當作參數傳遞到函數中的,所以它仍然存在慢速消費者問題。即使你必須傳一個帶緩沖的管道進來,如果管道已滿,向這個管道發送數據仍然可能會阻塞。文檔并沒有定義這種場景下的行為。需要我們來讀讀源碼了:

src/pkg/net/rpc/client.go

 func (call *Call) done() {
     select {
     case call.Done <- call:
     // ok
     default:
     // We don't want to block here.  It is the caller's responsibility to make
         // sure the channel has enough buffer space. See comment in Go().
         if debugLog {
             log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
         }
     }
}

噢!如果done管道沒有合適的緩沖,RPC的響應可能丟失了。

原則 #4

向一個管道發送無界數據流的API應該接受管道作為參數,而不是返回一個新的管道。

例子

signal.Notify

func Notify(c chan<- os.Signal, sig ...os.Signal)

ssh.NewClient

func NewClient(c Conn, chans <-chan NewChannel, reqs <-chan *Request) *Client

當我第一次看到signal.Notify這個API時,我很疑惑,“為什么它接收一個管道作為輸入而不是直接返回一個管道給我用?”“使用這個API需要調用方分配一個管道,難道API就不能替我們做么,像下面這樣?”

func Notify(sig ...os.Signal) <-chan os.Signal

文檔幫助我們理解為什么這不是好的選擇:

signal包向c發送數據時并不會阻塞:調用方必須保證c有足夠的緩沖空間來跟得上潛在的信號速度

signal.Notify接收管道作為參數,因為它把緩沖空間的控制權交給了調用方。這使得調用方可以選擇,在處理一個信號時,可以安全的忽略多少信號,這需要和緩存這些信號的內存開銷作折衷考慮。

緩沖大小的控制在高吞吐系統中尤為重要。設想一個高吞吐的發布訂閱系統的這樣一個接口:

func Subscribe(topic string, msgs chan<- Msg)

往管道中發送越多的消息,管道同步稱為性能瓶頸的可能性越大。由于API允許調用方創建管道,調用方需要考慮緩沖,進而性能可以由調用方控制。這是一種更靈活的設計。

如果僅僅是控制緩沖的大小,我們可能會爭論如下的API就足夠了:

func Notify(sig ...os.Signal, bufSize int) <-chan os.Signal

這樣設計,管道作為參數還是必須的,因為這樣允許調用方使用一個管道動態的處理不同類型的信號。這樣設計為調用方提供了更多的程序結構和性能上的靈活性。作為一個假想實驗,讓我們用Subscribe API來構建需求。訂閱newcustomer管道,并對于每一條消息,為消費者訂閱其主題。如果API允許我們傳遞接收管道,我們可以這樣寫:

msgs := make(chan Msg, 128)

Subscribe("newcustomer", msgs)
for m := range msgs {
    switch m.Topic {
    case "newcustomer":
        Subscribe(msg.Payload, msgs)
    default:
        handleCustomerMessage(m)
}

但是,如果管道被返回了,調用方不得不為每一個訂閱啟動一個單獨的go協程。這在任何復用場景都會帶來額外的內存和同步開銷:

for m := range Subscribe("newcustomer") {
    go subCustomer(m.Payload)
}

func subCustomer(topic string) {
    for m := range Subscribe(topic) {
        handleCustomerMessage(m)
    }
}

原則 #5

發送有界數據的API可以通過返回一個合適大小緩沖的管道來達到目的。

例子:

http.CloseNotifier

type CloseNotifier interface {
        // CloseNotify returns a channel that receives a single value
        // when the client connection has gone away.
        CloseNotify() <-chan bool
}

time.After

func After(d Duration) <-chan Time

當API向一個管道發送有界數據時,可以返回一個擁有容納全部數據的緩沖空間的管道。這個要返回的管道的方向性標識保證了調用方必須遵守約定。CloseNotify 和After返回的管道 都利用了這一點。

同時,需要注意到,通過允許調用方傳遞一個管道來接收數據,這些調用可能會更靈活。但需要處理當管道滿了的時候(原則3)。例如,另外一個可選的,更靈活的CloseNotifier:

type CloseNotifier interface {
        // CloseNotify sends a single value with the ResponseWriter whose
        // underlying connection has gone away.
        CloseNotify(chan<- http.ResponseWriter)
}

但是這種額外的靈活性帶來的開銷并不值得關注,因為單一的調用方很少會同時等待多個關閉通知。畢竟,關閉通知只有在某個連接上下文內才有效。不同的連接一般都是相互獨立的。

特例

一些API打破了我們的原則,需要仔細分析。

原則 #1 的特例

API需要聲明管的方向性。

例子

rpc.Client.Go

傳過來的done管道沒有方向性標識符:

func (client *Client) Go(serviceMethod string,
                         args interface{},
                         reply interface{},
                         done chan *Call
                         ) *Call

直觀上看,這樣做是因為done管道是作為Call結構體的一部分返回的。

type Call struct {
        // ...
        Done          chan *Call  // Strobes when call is complete.
}

這種靈活性是需要的,這樣允許在你傳nil時分配一個done管道出來。如果堅持原則1,就需要從Call結構中去除done并且聲明兩個函數:

func (c *Client) Go(method string,
                    args interface{},
                    reply interface{}
                    ) (*Call, <-chan *Call)

func (c *Client) GoEx(method string,
                      args interface{},
                      reply interface{},
                      done chan<- *Call
                      ) *Call

原則 #4 的特例

向管道發送無界數據流的API需要接收管道作為參數,而不是返回一個新的管道。

例子

go.crypto/ssh

func NewClientConn(c net.Conn, addr string, config *ClientConfig)
    (Conn, <-chan NewChannel, <-chan *Request, error)

time.Tick

func Tick(d Duration) <-chan Time

go.crypto/ssh包幾乎在所有的地方都返回了無界的數據流管道。ssh.NewClientConn只是其中的一個。給調用者更多控制權和靈活性的API應該是這樣:

func NewClientConn(c net.Conn,
                   addr string,
                   config *ClientConfig,
                   channels chan<- NewChannel,
                   reqs chan<- *Request
                   ) (Conn, error)

time.Tick也違反了這個原則,但是易于理解。我們很少會創建非常多的計時器,通常都是獨立的處理不同的計時器。這個例子中緩沖也沒太大意義。

第二部分:那些原本可能使用的管道

這篇文章是一篇長文,所以我準備分成兩部分講。接下來會提很多問題,為什么標準庫中可以使用管的地方卻沒有用管道。例如,http.Serve 返回了一個永不結束的等待被處理的請求流,為什么用了回調函數而不是將這些請求發送到一個處理管道中?第二部分會介紹更多!

原文鏈接: Alan Shreve   翻譯: 伯樂在線 - Codefor
譯文鏈接: http://blog.jobbole.com/73700/

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!