一起用 Golang 之 Go 程序的套路
系統性地介紹golang基礎的資料實在太多了,這里不再一一贅述。本文的思路是從另一個角度來由淺入深地探究下Go程序的套路。畢竟紙上得來終覺淺,所以,能動手就不要動口。有時候幾天不寫代碼,突然間有一天投入進來做個東西,才恍然發覺,也只有敲代碼的時候,才能找回迷失的自己,那可以忘掉一切的不開心。
Hello world
package main
import (
"fmt"
)
func main() {
fmt.Println("hello world")
}
go程序結構從整體上來說就是這樣的,第一行看起來這一定就是包頭聲明了,程序以包為單位,一個文件夾是一個包,一個包下可能有多個文件,但是包名都是同一個。相對C/C++程序的include來說,這里是import,后面跟的就是別的包名,一個包里定義的變量或類型,本包內都可見,若首字母大小,則可以被導出。如果引入了程序里不使用的包,編譯會報錯,報錯,錯。聲明不使用的變量也一樣,對,會報錯。這里行尾沒有分號,左大括號必須那樣放,縮進也不用你操心等等,編碼風格中的很多問題在這里都不再是問題,是的,go fmt幫你都搞定了,所以你看絕大部分go程序風格都好接近的。寫一段時間代碼后,你會發現,這種風格確實簡單,干凈利落。
本文重點
通過一些概念的學習和介紹,設計并實現個線程池,相信很多地方都可能用到這種模型或各種變形。
變量
變量的聲明、定義、賦值、指針等不想啰嗦了,去別的地方學吧。
結構體
我們先來定義一個結構體吧
package package1
type User struct {
Name string
addr int
age int
}
你一定注意到了,Name首字母是大寫的,在package2包中,import package1后就可以通過user.Name訪問Name成員了,Name是被導出的。但addr和age在package2中就不能直接訪問了,這倆沒有被導出,只能在package1包中被直接訪問,也就是私有的。那如何在package2中獲取沒有被導出的成員呢?我們來看下方法。
方法
func (u User) GetAge() string {
return u.age
}
func(u *User) SetAge(age int){
u.age = age
}
方法的使用和C++或者Java都很像的。下面代碼段中user的類型是*User,你會發現,無論方法的接收者是對象還是指針,方法調用時都只用.,而代表指針的->已經不在了。
user := &User{
Name: name,
addr: addr,
age: age,
}
user.SetAge(100)
fmt.Println(user.GetAge())
還有常用的構造對象的方式是這樣的
func NewUser(name string, addr string, age int) *User {
return &User{
Name: name,
addr: addr,
age: age,
}
}
user := new(User)
user := &User{}//與前者等價
user := User{}
組合與嵌套
Go中沒有繼承,沒有了多態,也沒有了模板。爭論已久的繼承與組合問題,在這里也不是問題了,因為已經沒得選擇了。比如我想實現個線程安全的整型(假設只用++和--),可能這么來做
type safepending struct {
pending int
mutex sync.RWMutex
}
func (s *safepending) Inc() {
s.mutex.Lock()
s.pending++
s.mutex.Unlock()
}
func (s *safepending) Dec() {
s.mutex.Lock()
s.pending--
s.mutex.Unlock()
}
func (s *safepending) Get() int {
s.mutex.RLock()
n := s.pending
s.mutex.RUnlock()
return n
}
也可以用嵌套寫法
type safepending struct {
pending int
sync.RWMutex
}
func (s *safepending) Inc() {
s.Lock()
s.pending++
s.Unlock()
}
func (s *safepending) Dec() {
s.Lock()
s.pending--
s.Unlock()
}
func (s *safepending) Get() int {
s.RLock()
n := s.pending
s.RUnlock()
return n
}
這樣safepending類型將直接擁有sync.RWMutex類型中的所有屬性,好方便的寫法。
interface
一個interface類型就是一個方法集,如果其他類型實現了interface類型中所有的接口,那我們就可以說這個類型實現了interface類型。舉個例子:空接口interface{}包含的方法集是空,也就可以說任何類型都實現了它,也就是說interface{}可以代表任何類型,類型直接的轉換看下邊的例子吧。
實現一個小頂堆
首先定義一個worker結構體, worker對象中存放很多待處理的request,pinding代表待處理的request數量,以worker為元素,實現一個小頂堆,每次Pop操作都返回負載最低的一個worker。
golang標準庫中提供了heap結構的容器,我們僅需要實現幾個方法,就可以實現一個堆類型的數據結構了,使用時只需要調用標準庫中提供的Init初始化接口、Pop接口、Push接口,就可以得到我們想要的結果。我們要實現的方法有Len、Less、Swap、Push、Pop,請看下邊具體代碼。另外值得一提的是,山楂君也是通過標準庫中提供的例子學習到的這個知識點。
type Request struct {
fn func() int
data []byte
op int
c chan int
}
type Worker struct {
req chan Request
pending int
index int
done chan struct{}
}
type Pool []*Worker
func (p Pool) Len() int {
return len(p)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p Pool) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
p[i].index = i
p[j].index = j
}
func (p *Pool) Push(x interface{}) {
n := len(*p)
item := x.(*Worker)
item.index = n
*p = append(*p, item)
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(*p)
item := old[n-1]
//item.index = -1
*p = old[:n-1]
return item
}
pool的使用
package main
import (
"container/heap"
"log"
"math/rand"
)
var (
MaxWorks = 10000
MaxQueue = 1000
)
func main() {
pool := new(Pool)
for i := 0; i < 4; i++ {
work := &Worker{
req: make(chan Request, MaxQueue),
pending: rand.Intn(100),
index: i,
}
log.Println("pengding", work.pending, "i", i)
heap.Push(pool, work)
}
heap.Init(pool)
log.Println("init heap success")
work := &Worker{
req: make(chan Request, MaxQueue),
pending: 50,
index: 4,
}
heap.Push(pool, work)
log.Println("Push worker: pending", work.pending)
for pool.Len() > 0 {
worker := heap.Pop(pool).(*Worker)
log.Println("Pop worker:index", worker.index, "pending", worker.pending)
}
}
程序的運行結果如下,可以看到每次Pop的結果都返回一個pending值最小的一個work元素。
2017/03/11 12:46:59 pengding 81 i 0
2017/03/11 12:46:59 pengding 87 i 1
2017/03/11 12:46:59 pengding 47 i 2
2017/03/11 12:46:59 pengding 59 i 3
2017/03/11 12:46:59 init heap success
2017/03/11 12:46:59 Push worker: pending 50
2017/03/11 12:46:59 Pop worker:index 4 pending 47
2017/03/11 12:46:59 Pop worker:index 3 pending 50
2017/03/11 12:46:59 Pop worker:index 2 pending 59
2017/03/11 12:46:59 Pop worker:index 1 pending 81
2017/03/11 12:46:59 Pop worker:index 0 pending 87
細心的你肯能會發現,不是work么,怎么沒有goroutine去跑任務?是的山楂君這里僅是演示了小頂堆的構建與使用,至于如何用goroutine去跑任務,自己先思考一下吧。
其實加上類似于下邊這樣的代碼就可以了
func (w *Worker) Stop() {
w.done <- struct{}{}
}
func (w *Worker) Run() {
go func() {
for {
select {
case req := <-w.req:
req.c <- req.fn()
case <-w.done:
break
}
}
}()
}
golang的并發
golang中的并發機制很簡單 ,掌握好goroutine、channel以及某些程序設計套路,就能用的很好。當然,并發程序設計中存在的一切問題與語言無關,只是每種語言中基礎設施對此支持的程度不一,Go程序中同樣都要小心。
goroutine
They're called goroutines because the existing terms—threads, coroutines, processes, and so on—convey inaccurate connotations. A goroutine has a simple model: it is a function executing concurrently with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required.
Goroutines are multiplexed onto multiple OS threads so if one should block, such as while waiting for I/O, others continue to run. Their design hides many of the complexities of thread creation and management.
Prefix a function or method call with the go keyword to run the call in a new goroutine. When the call completes, the goroutine exits, silently. (The effect is similar to the Unix shell's & notation for running a command in the background.)
啟動一個goroutine,用法很簡單:
go DoSomething()
channel
看channel的描述:
A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type. The value of an uninitialized channel is nil.
簡而言之,就是提供了goroutine之間的同步與通信機制。
共享內存?OR 通信?
Don't communicate by sharing memory; share memory by communicating
這就是Go程序中很重要的一種程序套路。拿一個具體的小應用場景來說吧:一個Map類型的數據結構,其增刪改查操作可能在多個線程中進行,我們會用什么樣的方案來實現呢?
- 增刪改查操作時加鎖
- 實現一個線程安全的Map類型
- 增刪改查操作限定在線程T中,其他線程如果想進行增刪改操作,統一發消息給線程T,由線程T來進行增刪操作(假設其他線程沒有Map的查詢操作)
對于方案3其實就是對Go程序這種套路的小應用,這種思想當然和語言無關,但是在Go語言中通過“通信”來共享內存的思路非常容易實現,有原生支持的goroutine、channel、select、gc等基礎設施,也許你會有"大消息"傳遞場景下的性能顧慮,但channel是支持引用類型的傳遞的,且會自動幫你進行垃圾回收,一個大結構體的引用類型實際上可能才占用了十幾個字節的空間。這實在是省去了山楂君很多的功夫。看Go程序的具體做法:
type job struct {
// something
}
type jobPair struct {
key string
value *job
}
type worker struct {
jobqueue map[string]*job // key:UserName
jobadd chan *jobPair
}
// 并不是真正的map insert操作,僅發消息給另外一個線程
func (w *worker) PushJob(user string, job *job) {
pair := &jobPair{
key: user,
value: job,
}
w.jobadd <- pair
}
// 并不是真正的map delete操作,僅發消息給另外一個線程
func (w *worker) RemoveJob(user string) {
w.jobdel <- user
}
func (w *worker) Run() {
go func() {
for {
select {
case jobpair := <-w.jobadd:
w.insertJob(jobpair.key, jobpair.value)
case delkey := <-w.jobdel:
w.deleteJob(delkey)
//case other channel
// for _, job := range w.jobqueue {
// do something use job
// log.Println(job)
// }
}
}
}()
}
func (w *worker) insertJob(key string, value *job) error {
w.jobqueue[key] = value
w.pending.Inc()
return nil
}
func (w *worker) deleteJob(key string) {
delete(w.jobqueue, key)
w.pending.Dec()
}
線程池
模型詳見下邊流程圖
線程池模型.png
由具體業務的生產者線程生成一個個不同的job,通過共同的Balance均衡器,將job分配到不同的worker去處理,每個worker占用一個goroutine。在job數量巨多的場景下,這種模型要遠遠優于一個job占用一個goroutine的模型。并且可以根據不同的業務特點以及硬件配置,配置不同的worker數量以及每個worker可以處理的job數量。
我們可以先定義個job結構體,根據業務不同,里邊會包含不同的屬性。
type job struct {
conn net.Conn
opcode int
data []byte
result chan ResultType //可能需要返回處理結果給其他channel
}
type jobPair struct {
key string
value *job
}
然后看下worker定義
type worker struct {
jobqueue map[string]*job // key:UserName
broadcast chan DataType
jobadd chan *jobPair
jobdel chan string
pending safepending
index int
done chan struct{}
}
func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
return &worker{
jobqueue: make(map[string]*job, queue_limit),
broadcast: make(chan DataType, source_limit), //4家交易所
jobadd: make(chan jobPair, jobreq_limit),
jobdel: make(chan string, jobreq_limit),
pending: safepending{0, sync.RWMutex{}},
index: idx,
done: make(chan struct{}),
}
}
func (w *worker) PushJob(user string, job *job) {
pair := jobPair{
key: user,
value: job,
}
w.jobadd <- pair
}
func (w *worker) RemoveJob(user string) {
w.jobdel <- user
}
func (w *worker) Run(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
log.Println("new goroutine, worker index:", w.index)
defer wg.Done()
ticker := time.NewTicker(time.Second * 60)
for {
select {
case data := <-w.broadcast:
for _, job := range w.jobqueue {
log.Println(job, data)
}
case jobpair := <-w.jobadd:
w.insertJob(jobpair.key, jobpair.value)
case delkey := <-w.jobdel:
w.deleteJob(delkey)
case <-ticker.C:
w.loadInfo()
case <-w.done:
log.Println("worker", w.index, "exit")
break
}
}
}()
}
func (w *worker) Stop() {
go func() {
w.done <- struct{}{}
}()
}
func (w *worker) insertJob(key string, value *job) error {
w.jobqueue[key] = value
w.pending.Inc()
return nil
}
func (w *worker) deleteJob(key string) {
delete(w.jobqueue, key)
w.pending.Dec()
}
結合上邊提到的小頂堆的實現,我們就可以實現一個帶負載均衡的線程池了。
一種模式并不能應用于所有的業務場景,山楂君覺得重要的是針對不同的業務場景去設計或優化編程模型的能力,以上有不妥之處,歡迎吐槽或指正,喜歡也可以打賞。
參考文獻
- https://blog.golang.org/share-memory-by-communicating
- https://golang.org/doc/effective_go.html
- http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
來自:http://www.jianshu.com/p/215510810c59