如何在 Go 語言中使用 Redis 連接池

TinaMerritt 10年前發布 | 21K 次閱讀 Google Go/Golang開發

來源:極客頭條

一、關于連接池

一個數據庫服務器只擁有有限的資源,并且如果你沒有充分使用這些資源,你可以通過使用更多的連接來提高吞吐量。一旦所有的資源都在使用,那么你就不能通過增加更多的連接來提高吞吐量。事實上,吞吐量在連接負載較大時就開始下降了。通常可以通過限制與可用的資源相匹配的數據庫連接的數量來提高延遲和吞吐量。

如果不使用連接池,那么,每次傳輸數據,我們都需要進行創建連接,收發數據,關閉連接。在并發量不高的場景,基本上不會有什么問題,一旦并發量上去了,那么,一般就會遇到下面幾個常見問題:

  • 性能普遍上不去
  • CPU 大量資源被系統消耗
  • 網絡一旦抖動,會有大量 TIME_WAIT 產生,不得不定期重啟服務或定期重啟機器
  • 服務器工作不穩定,QPS 忽高忽低

要想解決這些問題,我們就要用到連接池了。連接池的思路很簡單,在初始化時,創建一定數量的連接,先把所有長連接存起來,然后,誰需要使用,從這里取走,干完活立馬放回來。 如果請求數超出連接池容量,那么就排隊等待、退化成短連接或者直接丟棄掉。

二、使用連接池遇到的坑

最近在一個項目中,需要實現一個簡單的 Web Server 提供 Redis 的 HTTP interface,提供 JSON 形式的返回結果。考慮用 Go 來實現。

首先,去看一下 Redis 官方推薦的 Go Redis driver。官方 Star 的項目有兩個:Radix.v2 和 Redigo。經過簡單的比較后,選擇了更加輕量級和實現更加優雅的 Radix.v2。

Radix.v2 包是根據功能劃分成一個個的 sub package,每一個 sub package 在一個獨立的子目錄中,結構非常清晰。我的項目中會用到的 sub package 有 redis 和 pool。

由于我想讓這種被 fork 的進程最好簡單點,做的事情單一一些,所以,在沒有深入去看 Radix.v2 的 pool 的實現之前,我選擇了自己實現一個 Redis pool。(這里,就不貼代碼了。后來發現自己實現的 Redis pool 與 Radix.v2 實現的 Redis pool 的原理是一樣的,都是基于 channel 實現的, 遇到的問題也是一樣的。)

不過在測試過程中,發現了一個詭異的問題。在請求過程中經常會報 EOF 錯誤。而且是概率性出現,一會有問題,一會又好了。通過反復的測試,發現 bug 是有規律的,當程序空閑一會后,再進行連續請求,會發生3次失敗,然后之后的請求都能成功,而我的連接池大小設置的是3。再進一步分析,程序空閑300秒后,再請求就會失敗,發現我的 Redis server 配置了 timeout 300,至此,問題就清楚了。是連接超時 Redis server 主動斷開了連接。客戶端這邊從一個超時的連接請求就會得到 EOF 錯誤。

然后我看了一下 Radix.v2 的 pool 包的源碼,發現這個庫本身并沒有檢測壞的連接,并替換為新的連接的機制。也就是說我每次從連接池里面 Get 的連接有可能是壞的連接。所以,我當時臨時的解決方案是通過增加失敗后自動重試來解決了。不過,這樣的處理方案,連接池的作用好像就沒有了。技術債能早點還的還是早點還上。

三、使用連接池的正確姿勢

想到我們的 ngx_lua 項目里面也大量使用 redis 連接池,他們怎么沒有遇到這個問題呢。只能去看看源碼了。

經過抽象分離, ngx_lua 里面使用 redis 連接池部分的代碼大致是這樣的:

server {
    location /pool {
        content_by_lua_block {
            local redis = require "resty.redis"
            local red = redis:new()

            local ok, err = red:connect("127.0.0.1", 6379)
            if not ok then
                ngx.say("failed to connect: ", err)
                return
            end

            ok, err = red:set("hello", "world")
            if not ok then
                return
            end

            red:set_keepalive(10000, 100)
        }
    }
}

發現有個 set_keepalive 的方法,查了一下官方文檔,方法的原型是 syntax: ok, err = red:set_keepalive(max_idle_timeout, pool_size) 貌似 max_idle_timeout 這個參數,就是我們所缺少的東西,然后進一步跟蹤源碼,看看里面是怎么保證連接有效的。

function _M.set_keepalive(self, ...)
    local sock = self.sock
    if not sock then
        return nil, "not initialized"
    end

    if self.subscribed then
        return nil, "subscribed state"
    end

    return sock:setkeepalive(...)
end

至此,已經清楚了,使用了 tcp 的 keepalive 心跳機制。

于是,通過與 Radix.v2 的作者一些討論,選擇自己在 redis 這層使用心跳機制,來解決這個問題。

四、最后的解決方案

在創建連接池之后,起一個 goroutine,每隔一段 idleTime 發送一個 PING 到 Redis server。其中,idleTime 略小于 Redis server 的 timeout 配置。
連接池初始化部分代碼如下:

p, err := pool.New("tcp", u.Host, concurrency)
errHndlr(err)
go func() {
    for {
        p.Cmd("PING")
        time.Sleep(idelTime * time.Second)
    }
}()

使用 redis 傳輸數據部分代碼如下:

func redisDo(p *pool.Pool, cmd string, args ...interface{}) (reply *redis.Resp, err error) {
    reply = p.Cmd(cmd, args...)
    if err = reply.Err; err != nil {
        if err != io.EOF {
            Fatal.Println("redis", cmd, args, "err is", err)
        }
    }

    return
}

其中,Radix.v2 連接池內部進行了連接池內連接的獲取和放回,代碼如下:

// Cmd automatically gets one client from the pool, executes the given command
// (returning its result), and puts the client back in the pool
func (p *Pool) Cmd(cmd string, args ...interface{}) *redis.Resp {
    c, err := p.Get()
    if err != nil {
        return redis.NewResp(err)
    }
    defer p.Put(c)

    return c.Cmd(cmd, args...)
}

這樣,我們就有了 keepalive 的機制,不會出現 timeout 的連接了,從 redis 連接池里面取出的連接都是可用的連接了。看似簡單的代碼,卻完美的解決了連接池里面超時連接的問題。同時,就算 Redis server 重啟等情況,也能保證連接自動重連。

 本文由用戶 TinaMerritt 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!