Go 語言中漂亮的并發特性

jopen 11年前發布 | 117K 次閱讀 Go 語言 Google Go/Golang開發

時不時地學習一門新的編程語言對你來說是大有裨益的,哪怕這門語言并不那么成功甚至有些過時。用新的編程語言來解決老的編程問題會迫使你對自己的某些觀點、方法甚至是習慣進行重新思考。

我喜歡嘗試新的東西,尤其是編程語言。但是如果只編寫了“Hellow World"或者實現了Fibonacci數列,你通常會對這門語言毫無感覺、甚至覺得索然寡味。你應該去試著實現Eratosthenes篩法,嘗試一些數據結構或者感受一下它的性能表現。但是我想要的是更加現實的、甚至能夠為以后復用的代碼。所以不久前我為自己出了個題目,來幫助我只通過幾百行代碼就可以體會到新的編程語言的感覺。

這個項目涉及一個語言中的幾個基本元素:字符串,文件,網絡I/O,當然還有并發性。這個項目叫做TCP/IP代理(或者你可以叫做網絡調試器)。它的理念是,你擁有一個TCP/IP監聽器(單線程或多線程)監聽一個指定的端口并接受外來的連接。當他接受到一個接入請求,它就會建立一個連接,并且在遠程客戶端與服務器之間做雙向數據傳輸。另外,這個代理可以記錄日志,以不同的格式來記錄,以便日后做分析。

當我需要這個工具的時候,我不再需要到處去找。每次涉及網絡編程的時候,這樣一個工具是必須的。我已經使用過不同的語言來實現這個工具,包括C, C++, Perl, PHP。最近的兩個實現是使用python 和 Erlang。它代表著我正在尋找的答案。

我們可以再具體說說我們的需求。這個應用必須支持同時建立多個連接。對于每一個連接,它需要通過3種途徑記錄數據:一個以十六進制順序記錄來自于雙向傳輸的數據日志,兩個用于分開記錄進和出的數據流的二進制日志。

我們在這篇文章中將實現這個程序,我們使用的語言是Go。Go的作者聲稱,Go骨子里就滲透著并發和多線程特性。我想把它帶到我們的世界。

如果我通過C++來實現,我可能就需要main監聽線程,和每個連接的線程。所以,單獨一個連接就可以通過一個線程,而得到完整的服務(I/O和日志記錄)。

以下是我在Go實現中用于服務每個連接的線程:

  • 一個雙向十六進制日志記錄線程
  • 兩個以二進制記錄進和出的數據流的線程
  • 兩個用于在服務器和遠程主機間傳輸數據的線程
  • </ul>

    總共5個線程。

    5個線程在為一個獨立的連接服務。我實現了這些線程,不是為了多線程本身,而是因為Go鼓勵多線程,而C++不鼓勵(及時最新的C++x11標準也類似)。多線程在Go中是如此自然而簡單。我在Go語言中實現TCP/IP代理,沒有使用鎖和條件變量。同步由Go的channel方式進行優雅的管理。

    好吧,這里有源代碼,包含解釋。如果你不熟悉Go,注釋會有幫助。我的本意不僅關注于這個程序的功能,也關注Go語言本身。

    現在開始

    從 2-11 行我們聲明了一些要用到的包。值得注意的是,如果引入的包沒有用到,Go 視之為一個錯誤并且強制刪除沒用的聲明(在C++項目中,你最后完成了,記得什么時候清理過 STL 的頭文件嗎?)

    package main

    import (     "encoding/hex"     "flag"     "fmt"     "net"     "os"     "runtime"     "strings"     "time" )</pre>從 12-16行我們聲明了一些代表命令行標志的全局變量。后面,我們會看到如何解析他們。

    var (
        host *string = flag.String("host", "","target host or address")
        port *string = flag.String("port", "0", "target port")
        listen_port *string = flag.String("listen_port", "0","listen port")
    )

    從 17-20 行我們看到Go語言中可變參數函數的語法結構。

    </tr> </tbody> </table>

    從 21-28 行有兩個函數分別啟動十六進制數據日志和二進制流日志。他們唯一的區別是日志名稱不同。

     
    func die(format string, v ...interface{}) {
         os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...))
         os.Exit(1)
    }

    </tr> </tbody> </table>

    在第 29-43 行,你能看到 Go 真正的樂趣。函數 logger_loop 創建一個日志文件,然無限循環的寫入(35-42行)。在第36行,代碼等待來自通道 data 的消息。在第34行有一個很很有趣的技巧。操作符 defer 允許我們定義一段在函數功能結束時執行的代碼(類似Java中的 finally)。如果接受到空數據,函數將退出。

    func logger_loop(data chan []byte, log_name string) {
        f, err := os.Create(log_name)
        if err != nil {
            die("Unable to create file %s, %v\n", log_name, err)
        }
        defer f.Close()
        for {
            b := <-data
            if len(b) == 0 {
                 break
            }
            f.Write(b)
            f.Sync()
        }
    }
    func format_time(t time.Time) string {
        return t.Format("2006.01.02-15.04.05")
    }
    func printable_addr(a net.Addr) string {
        return strings.Replace(a.String(), ":", "-", -1)
    }
    type Channel struct {
        from, to net.Conn
        logger, binary_logger chan []byte
        ack chan bool
    }

    在第55-88行中有一個函數用來從源套接字讀取數據并寫入到日志中,最后將其發送到目的套接字。對于每個連接,有兩個pass_through函數的實例,在相反方向的本地和遠程之間復制數據。當一個I / O錯誤發生時,它被處理成連接的斷開。最后,在第79行此函數中目標套接字發送確認信號到主線程,信號終止。

    func pass_through(c *Channel){ 
        from_peer := printable_addr(c.from.LocalAddr())
        to_peer := printable_addr(c.to.LocalAddr())

        b := make([]byte, 10240)    offset := 0    packet_n := 0    for {       n, err := c.from.Read(b)        if err != nil {           c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n",from_peer))           break       }        if n > 0 {             c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X)%d bytes from %s\n",packet_n, offset, n, from_peer))           c.logger <- []byte(hex.Dump(b[:n]))            c.binary_logger <- b[:n]            c.to.Write(b[:n])            c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n",packet_n, to_peer))            offset += n            packet_n += 1        }     }     c.from.Close()     c.to.Close()     c.ack <- true }</pre>

    在81-107行有一個負責處理實際連接的函數。這個函數連接遠程的socket(第82行),對連接計時(第88行,第101-103行),開啟日志(第93-95行),最后啟動兩個傳輸數據的線程(第97-98行)。只要兩個連接都還可用,pass_through就會一直執行下去。在第99-100行我們等待數據傳輸線程返回的確認信息。在第104-106行我們關閉日志。

    func process_connection(local net.Conn, conn_n int, target string) {
        remote, err := net.Dial("tcp", target)
        if err != nil {
            fmt.Printf("Unable to connect to %s, %v\n", target, err)
        }

    local_info := printable_addr(remote.LocalAddr())
    remote_info := printable_addr(remote.RemoteAddr())
    
    started := time.Now()
    
    logger := make(chan []byte)
    from_logger := make(chan []byte)
    to_logger := make(chan []byte)
    ack := make(chan bool)
    
    go connection_logger(logger, conn_n, local_info, remote_info)
    go binary_logger(from_logger, conn_n, local_info)
    go binary_logger(to_logger, conn_n, remote_info)
    
    logger <- []byte(fmt.Sprintf("Connected to %s at %s\n",
    target, format_time(started)))
    
    go pass_through(&Channel{remote, local, logger, to_logger, ack})
    go pass_through(&Channel{local, remote, logger, from_logger, ack})
    <-ack // Make sure that the both copiers gracefully finish.
    <-ack //
    
    finished := time.Now()
    duration := finished.Sub(started)
    logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n",
    format_time(started), duration.String()))
    
    logger <- []byte{} // Stop logger
    from_logger <- []byte{} // Stop "from" binary logger
    to_logger <- []byte{} // Stop "to" binary logger
    

    }</pre>

    在第108-132行是運行TCP / IP偵聽的主要入口函數。在第109行,我們要求Go運行時使用所有的物理可用CPU。

    func main() {
        runtime.GOMAXPROCS(runtime.NumCPU())
        flag.Parse()
        if flag.NFlag() != 3 {
            fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_post=local_port\n")
            flag.PrintDefaults()
            os.Exit(1)
        }
        target := net.JoinHostPort(*host, *port)
        fmt.Printf("Start listening on port %s and forwarding data to %s\n",*listen_port, target)
        ln, err := net.Listen("tcp", ":"+*listen_port)
        if err != nil {
            fmt.Printf("Unable to start listener, %v\n", err)
            os.Exit(1)
        }
        conn_n := 1
        for {
            if conn, err := ln.Accept(); err ==nil {
                go process_connection(conn, conn_n, target)
                conn_n += 1
            } else {
             fmt.Printf("Accept failed, %v\n", err)        }
        }
    }

    這個程序只有132行。請注意:我們只使用了標準庫。
    現在,我們已經準備好運行:

    go run gotcpspy.go -host pop.yandex.ru -port 110 -local_port 8080
    它應該會打印出:

    Start listening on port 8080 and forwarding data to pop.yandex.ru:110
    然后你就可以在另一個窗口中運行:

    telnet localhost 8080
    接下來輸入,例如用戶 test[ENTER]鍵和密碼(空)[ENTER] 。三個日志文件即將被創建(當然在每人 不同 情況下 生成的時間戳不同 )。

    雙向十六進制日志 log-2012.04.20-19.55.17-0001-192.168.1.41 -49544-213.180.204.37-110.log:

    Connected to pop.yandex.ru:110 at 2012.04.20-19.55.17
    Received (#0, 00000000) 38 bytes from 192.168.1.41-49544
    00000000 2b 4f 4b 20 50 4f 50 20 59 61 21 20 76 31 2e 30
    |+OK POP Ya! v1.0|
    00000010 2e 30 6e 61 40 32 36 20 48 74 6a 4a 69 74 63 50
    |.0na@26 HtjJitcP|
    00000020 52 75 51 31 0d 0a
    |RuQ1..|
    Sent (#0) to [--1]-8080
    Received (#0, 00000000) 11 bytes from [--1]-8080
    00000000 55 53 45 52 20 74 65 73 74 0d 0a
    |USER test..|
    Sent (#0) to 192.168.1.41-49544
    Received (#1, 00000026) 23 bytes from 192.168.1.41-49544
    00000000 2b 4f 4b 20 70 61 73 73 77 6f 72 64 2c 20 70 6c
    |+OK password, pl|
    00000010 65 61 73 65 2e 0d 0a
    |ease...|
    Sent (#1) to [--1]-8080
    Received (#1, 0000000B) 11 bytes from [--1]-8080
    00000000 50 41 53 53 20 6e 6f 6e 65 0d 0a
    |PASS none..|
    Sent (#1) to 192.168.1.41-49544
    Received (#2, 0000003D) 72 bytes from 192.168.1.41-49544
    00000000 2d 45 52 52 20 5b 41 55 54 48 5d 20 6c 6f 67 69
    |-ERR [AUTH] logi|
    00000010 6e 20 66 61 69 6c 75 72 65 20 6f 72 20 50 4f 50
    |n failure or POP|
    00000020 33 20 64 69 73 61 62 6c 65 64 2c 20 74 72 79 20
    |3 disabled, try |
    00000030 6c 61 74 65 72 2e 20 73 63 3d 48 74 6a 4a 69 74
    |later. sc=HtjJit|
    00000040 63 50 52 75 51 31 0d 0a
    |cPRuQ1..|
    Sent (#2) to [--1]-8080
    Disconnected from 192.168.1.41-49544
    Disconnected from [--1]-8080
    Finished at 2012.04.20-19.55.17, duration 5.253979s

    傳出數據二進制日志 log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:

    USER test
    PASS none

    傳入數據二進制日志 log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:

    +OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1
    +OK password, please.

    -ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ1</pre>

    看起來有用,現在我們試試下載更大的二進制文件來測試性能,先直接下載,再通過代理。

    直接下載(文件大小約72MB):

    time wget http://www.erlang.org/download/otp_src_R15B01.tar.gz

    ...

    Saving to: `otp_src_R15B01.tar.gz'

    ...

    real 1m2.819s</pre>

    現在,試試通過代理下載:

    go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080

    下載:

    time wget http://localhost:8080/download/otp_src_R15B01.tar.gz

    ...

    Saving to: `otp_src_R15B01.tar.gz.1'

    ...

    real 0m56.209s</pre>

    比較一下結果:

    diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1

    兩者匹配,程序運行正確。

    現在來看看性能,我在我的 Mac Air 上將這個實驗重復了幾次。驚訝的是,對我來說,通過代理下載居然比直接下載還要快一些。在上面的實驗中:1m2819s (直接) VS. 0m.56209s (代理)。我能想到的唯一解釋就是 wget是單線程的,它在一個線程內復用輸入和輸出流。反過來,代理以獨立線程處理每一個流,可能因此速度稍稍快一些。此中差異甚微,幾乎不能察覺,或許在另一臺電腦或另一個網路中,這點差異就會完全消失。主要的觀察是,盡管通過代理下載會額外產生龐大的日志開銷,下載速度并不會因此減慢。

    綜上所述,我希望你從簡單和清晰的角度來看這個程序。我在上面已經指出,但我想再次強調:我已經開始逐漸在這個應用程序中使用線程(goroutine)。問題的本質只需我在一個正在運行的連接中標記并發任務,然后利用Go的易用性和安全性的并發機制已經很好地實現了它,而且我 最終不用在效率與復雜的并發(和調試的難度)顧此失彼。

    有時候同意一個簡單的問題只需輸入比特和字節, 你唯一關心的是代碼的線性效率。但你在并發,多線程處理能力中遇到的逐漸增多的問題將成為關鍵因素,而對于這種應用,Go的魅力即將閃耀。

    我希望作為一個有代表性的例子來炫耀Go的 方便甚至并發美。

     本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
     轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
     本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
     
    func connection_logger(data chan []byte, conn_n int,
        local_info, remote_info string) {
        log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log",format_time(time.Now()), conn_n, local_info, remote_info)
        logger_loop(data, log_name)
    }
    func binary_logger(data chan []byte, conn_n int, peer string) {
        log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log",
        format_time(time.Now()), conn_n, peer)
        logger_loop(data, log_name)
    }

sesese色