NSQ系列之nsqlookupd代碼分析一(初探nsqlookup)

jopen 10年前發布 | 16K 次閱讀 NSQ NoSQL數據庫

nsqlookupd 是守護進程負責管理拓撲信息。客戶端通過查詢 nsqlookupd 來發現指定話題(topic)的生產者,并且 nsqd 節點廣播話題(topic)和通道(channel)信息。 nsqlookupd 有兩個接口:TCP 接口,nsqd 用它來廣播。HTTP 接口,客戶端用它來發現和管理。

NSQ系列之nsqlookupd代碼分析一(初探nsqlookup)

nsqlookupd是守護進程負責管理拓撲信息。客戶端通過查詢nsqlookupd來發現指定話題(topic)的生產者,并且nsqd節點廣播話題(topic)和通道(channel)信息。

nsqlookupd有兩個接口:TCP接口,nsqd用它來廣播。HTTP接口,客戶端用它來發現和管理。

nsqlookup struct分析

代碼文件路徑為nsq/nsqlookupd/nsqlookupd.go

type NSQLookupd struct {
    sync.RWMutex               //讀寫鎖
    opts         *Options  //nsqlookupd 配置信息 定義文件路徑為nsq/nsqlookupd/options.go
    tcpListener  net.Listener 
    httpListener net.Listener 
    waitGroup    util.WaitGroupWrapper //WaitGroup 典型應用 用于開啟兩個goroutine,一個監聽HTTP 一個監聽TCP
    DB           *RegistrationDB //product 注冊數據庫 具體分析后面章節再講
}

//初始化NSQLookupd實例
func New(opts *Options) *NSQLookupd {
    n := &NSQLookupd{
        opts: opts,
        DB:   NewRegistrationDB(), //初始化DB實例
    }
    n.logf(version.String("nsqlookupd"))
    return n
}


func (l *NSQLookupd) Main() {
    ctx := &Context{l} //初始化Context實例將NSQLookupd指針放入Context實例中 Context結構請參考文件nsq/nsqlookupd/context.go Context用于nsqlookupd中的tcpServer 和 httpServer中

    tcpListener, err := net.Listen("tcp", l.opts.TCPAddress) //開啟TCP監聽
    if err != nil {
        l.logf("FATAL: listen (%s) failed - %s", l.opts.TCPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.tcpListener = tcpListener
    l.Unlock()
    tcpServer := &tcpServer{ctx: ctx} //創建一個tcpServer tcpServer 實現了nsq/internal/protocol包中的TCPHandler接口
    l.waitGroup.Wrap(func() {
            //protocol.TCPServer方法的過程就是tcpListener accept tcp的連接
            //然后通過tcpServer中的Handle分析報文,然后處理相關的協議
        protocol.TCPServer(tcpListener, tcpServer, l.opts.Logger)
    }) //把tcpServer加入到waitGroup

    httpListener, err := net.Listen("tcp", l.opts.HTTPAddress) //開啟HTTP監聽
    if err != nil {
        l.logf("FATAL: listen (%s) failed - %s", l.opts.HTTPAddress, err)
        os.Exit(1)
    }
    l.Lock()
    l.httpListener = httpListener
    l.Unlock()
    httpServer := newHTTPServer(ctx) //創建一個httpServer
    l.waitGroup.Wrap(func() {
        http_api.Serve(httpListener, httpServer, "HTTP", l.opts.Logger)
    }) //把httpServer加入到waitGroup
}


//NSQLookupd退出
func (l *NSQLookupd) Exit() {
    if l.tcpListener != nil {
        l.tcpListener.Close() //關閉tcpListener
    }

    if l.httpListener != nil {
        l.httpListener.Close() //關閉httpListener
    }
    l.waitGroup.Wait()
}

這一章節的代碼就先分析到這里了,下一章節要分析的是nsqlookup中的tcpServer.

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!