Hadoop的Server及其線程模型分析

er74 9年前發布 | 15K 次閱讀 Hadoop 分布式/云計算/大數據

早期的一篇文章,針對Hadoop 2.6.0.這里發一下.

一、Listener

Listener線程,當Server處于運行狀態時,其負責監聽來自客戶端的連接,并使用Select模式處理Accept事件。

同時,它開啟了一個空閑連接(Idle Connection)處理例程,如果有過期的空閑連接,就關閉。這個例程通過一個計時器來實現。

Hadoop的Server及其線程模型分析

當select操作調用時,它可能會阻塞,這給了其它線程執行的機會。當有accept事件發生,它就會被喚醒以處理全部的事件,處理事件是進行一個doAccept的調用。

doAccept:

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
  ServerSocketChannel server = (ServerSocketChannel) key.channel();
  SocketChannel channel;
  while ((channel = server.accept()) != null) {
    channel.configureBlocking(false);
    channel.socket().setTcpNoDelay(tcpNoDelay);
    channel.socket().setKeepAlive(true);
    Reader reader = getReader();
    Connection c = connectionManager.register(channel);
    key.attach(c);  // so closeCurrentConnection can get the object
    reader.addConnection(c);
  }
}

由于多個連接可能同時發起申請,所以這里采用了while循環處理。

這里最關鍵的是設置了新建立的socket為非阻塞,這一點是基于性能的考慮,非阻塞的方式盡可能的讀取socket接收緩沖區中的數據,這一點保證了將來會調用這個socket進行接收的Reader和進行發送的Responder線程不會因為發送和接收而阻塞,如果整個通訊過程都比較繁忙,那么Reader和Responder線程的就可以盡量不阻塞在I/O上,這樣可以顯著減少線程上下文切換的次數,提高cpu的利用率。

最后,獲取了一個Reader,將此連接加入Reader的緩沖隊列,同時讓連接管理器監視并管理這個連接的生存期。

獲取Reader的方式如下:

 //最簡單的負載均衡
    Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
    }

二、Reader

當一個新建立的連接被加入Reader的緩沖隊列pendingConnections之后,Reader也被喚醒,以處理此連接上的數據接收。

      public void addConnection(Connection conn) throws InterruptedException {
        pendingConnections.put(conn);
        readSelector.wakeup();
      }

Server中配置了多個Reader線程,顯然是為了提高并發服務連接的能力。

下面是Reader的主要邏輯:

while(true) {
        ...
       //取出一個連接,可能阻塞
    Connection conn = pendingConnections.take();
    //向select注冊一個讀事件
    conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
    ...
    //進行select,可能阻塞
    readSelector.select();
    ...
    //依次讀取數據
    for(keys){
            doRead(key);
       }
       ...
}

當Server還在運行時,Reader線程盡可能多地處理緩沖隊列中的連接,注冊每一個連接的READ事件,采用select模式來獲取連接上有數據接收的通知。當有數據需要接收時,它盡最大可能讀取select返回的連接上的數據,以防止Listener線程因為沒有運行時間而發生饑餓(starving)。

如果Listener線程饑餓,造成的結果是并發能力急劇下降,來自客戶端的新連接請求超時或無法建立。

注意在從緩沖隊列中獲取連接時,Reader可能會發生阻塞,因為它采用了LinkedBlockingQueue類中的take方法,這個方法在隊列為空時會阻塞,這樣Reader線程得以阻塞,以給其它線程執行的時間。

Reader線程的喚醒時機有兩個:

  1. Listener建立了新連接,并將此連接加入1個Reader的緩沖隊列;
  2. select調用返回。

在Reader的doRead調用中,其主要調用了readAndProcess方法,此方法循環處理數據,接收數據包的頭部、上下文頭部和真正的數據。這個過程中值得一提的是下面的這個channelRead方法:

  private int channelRead(ReadableByteChannel channel, 
                        ByteBuffer buffer) throws IOException {

  int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
              channel.read(buffer) : channelIO(channel, null, buffer);
  if (count > 0) {
    rpcMetrics.incrReceivedBytes(count);
  }
  return count;
}

channelRead會判斷數據接收數組buffer中的剩余未讀數據,如果大于一個臨界值NIO_BUFFER_LIMIT,就采取分片的技巧來多次地讀,以防止jdk對large buffer采取變為direct buffer的優化。

這一點,也許是考慮到direct buffer在建立時會有一些開銷,同時在jdk1.6之前direct buffer不會被GC回收,因為它們分配在JVM的堆外的內存空間中。

到底這樣優化的效果如何,沒有測試,也就略過。也許是為了減少GC的負擔。

在Reader讀取到一個完整的RpcRequest包之后,會調用processOneRpc方法,此調用將進入業務邏輯環節。這個方法,會從接受到的數據包中,反序列化出RpcRequest的頭部和數據,依此構造一個RpcRequest對象,設置客戶端需要的跟蹤信息(trace info),然后構造一個Call對象,如下圖所示:

Hadoop的Server及其線程模型分析

此后,在Handler處理時,就以Call為單位,這是一個包含了與連接相關信息的封裝對象。

有了Call對象后,將其加入Server的callQueue隊列,以供Handler處理。因為采用了put方法,所以當callQueue滿時(Handler忙),Reader會發生阻塞,如下所示:

callQueue.put(call);              // queue the call; maybe blocked here

三、Handler

Handler就是根據rpc請求中的方法(Call)及參數,來調用相應的業務邏輯接口來處理請求。

一個Server中有多個Handler,對應多個業務接口,本篇不討論具體業務邏輯。

handler的邏輯基本如下(略去異常和其它次要信息):

public void run() {
    SERVER.set(Server.this);
    ByteArrayOutputStream buf = 
      new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
    while (running) {
      try {
        final Call call = callQueue.take(); // pop the queue; maybe blocked here
        CurCall.set(call);
        try {
          if (call.connection.user == null) {
            value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                         call.timestamp);
          } else {
            value = 
              call.connection.user.doAs(...);
          }
        } catch (Throwable e) {
          //略 ... 
        }
        CurCall.set(null);
        synchronized (call.connection.responseQueue) {
          responder.doRespond(call);
        }
}

可見,Handler從callQueue中取出一個Call,然后調用這個Server.call方法,最后調用Responder的doResponde方法將結果發送給客戶端。

Server.call方法:

    public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
    }

四、Responder

一個Server只有1個Responder線程。

此線程不斷進行如下幾個重要調用以和Handler協調并發送數據:

//這個wait是同步作用,具體見下面分析
waitPending();     
...
//開始select,或許會阻塞
writeSelector.select(PURGE_INTERVAL);
...
//如果selectKeys有數據,就依次異步發送數據
for(selectorKeys){
    doAsyncWrite(key);
}
...
//當到達丟棄時間,會從selectedKeys構造calls,并依次丟棄
for(Call call : calls) {
  doPurge(call, now);
}

當Handler調用doRespond方法后,handler處理的結果被加入responseQueue的隊尾,而不是立即發送回客戶端:

    void doRespond(Call call) throws IOException {
  synchronized (call.connection.responseQueue) {
    call.connection.responseQueue.addLast(call);
    if (call.connection.responseQueue.size() == 1) {
      //注意這里isHandler = true,表示可能會向select注冊寫事件以在Responder主循環中通過select處理數據發送
      processResponse(call.connection.responseQueue, true);
    }
  }
}

上面的synchronized 可以看出,responseQueue是爭用資源,相應的:

Handler是生產者,將結果加入隊列;Responder是消費者,從隊列中取出結果并發送。

processResponse將啟動Responder進行發送,首先從responseQueue中以非阻塞方式取出一個call,然后以非阻塞方式盡力發送call.rpcResponse,如果發送完畢,則返回。

當還有剩余數據未發送,將call插入隊列的第一個位置,由于isHandler參數,在來自Handler的調用中傳入為true,所以會喚醒writeSelector,并注冊一個寫事件,其中incPending()方法,是為了在向selector注冊寫事件時,阻塞Responder線程,后面有分析。

         call.connection.responseQueue.addFirst(call);

if (inHandler) {
  // set the serve time when the response has to be sent later
  call.timestamp = Time.now();

  incPending();
  try {
    // Wakeup the thread blocked on select, only then can the call 
    // to channel.register() complete.
    writeSelector.wakeup();
    channel.register(writeSelector, SelectionKey.OP_WRITE, call);
  } catch (ClosedChannelException e) {
    //Its ok. channel might be closed else where.
    done = true;
  } finally {
    decPending();
  }
}

再回到Responder的主循環,看看如果向select注冊了寫事件會發生什么:

          //執行這句時,如果Handler調用的responder.doResonde()正在向select注冊寫事件,這里就會阻塞
//目的很顯然,是為了下句的select能獲取數據并立即返回,這就減少了阻塞發生的次數
waitPending();     // If a channel is being registered, wait.

//這里用超時阻塞來select,是為了能夠在沒有數據發送時,定期喚醒,以處理長期未得到處理的Call
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
  SelectionKey key = iter.next();
  iter.remove();
  try {
    if (key.isValid() && key.isWritable()) {
        //異步發送
        doAsyncWrite(key);
    }
  } catch (IOException e) {
    LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
  }
}

重點內容都做了注釋,不再贅述。可以看出,既考慮同步,又考慮性能,這是值得學習的地方。

五、總結

本篇著重分析了hadoop的rpc調用中server部分,可以看出,這是一個精良的設計,考慮的很細。

  1. 關于同步:

    Listener生產,Reader消費;Reader生產,Handler消費,Handler生產,Responder消費。

    所以它們之間必須同步.在具體的hadoop實現中,既有利用BlockingQueue的put&take操作實現阻塞,以達到同步目的,也對爭用資源使用synchronized來實現同步。

  2. 關于緩沖:

    其中幾個緩沖隊列也值得關注.Server的并發請求會特別多,而Handler在執行call進行業務邏輯時,肯定會慢下來,所以必須建立請求和處理之間的緩沖。

    另外,處理和發送之間也同樣會出現速率不匹配的現象,同樣需要緩沖。

  3. 關于線程模型:

    Listener單線程,Reader多線程,Handler多線程,Responder單線程,為什么會這樣設計?

    Listener采用select模式處理accept事件,一個客戶端在一段時間內一般只建立有限次的連接,而且連接的建立是比較快的,所以單線程足夠應付,建立后直接丟給Reader,從而自己很從容地應付新連接。

    Handler多線程,業務邏輯是大頭,又很大可能會牽涉I/O密集(HDFS),如果線程少,耗時過長的業務邏輯可能就會讓大部分的Handler線程處于阻塞,這樣輕快的業務邏輯也必須排隊,可能會發生饑餓。如果Reader收集的請求隊列長時間處于滿的狀態,整個通訊必然惡化,所以這是典型的需要降低響應時間、提升吞吐量的高并發時刻,這個時刻的上下文切換是必須的,不糾結,并發為重。

    Responder是單線程,顯然,Responder會比較輕松,因為雖然請求很多,但經過Reader->Handler的緩沖和Handler的處理,上一批能發送完的結果已經發送了。Responder更多的是搜集并處理那些長結果,并通過高效select模式來獲取結果并發送。

    這里,Handler在業務邏輯調用完畢直接調用了responder.doRespond發送,是因為這是個立即返回的調用,這個調用的耗時是很少的,所以不必讓Handler因為發送而阻塞,進一步充分發揮了Handler多線程的能力,減少了線程切換的機會,強調了其多線程并發的優勢,同時又為responder減負,以增強Responder單線程作戰的信心。

  4. 關于鎖對Hadoop來講,因為同步需求,所以加鎖是必不可少的。性能是需要考慮,但是從工程的角度上來看,通訊層的穩定性、代碼可維護性、保持代碼結構的相對簡單性(其代碼因歷史原因已非常復雜),大部分采用了synchronized這種悲觀得、重型的加鎖方式,這樣,可以顯著減少對象之間同步的復雜性,減少錯誤的發生。

六、(補充)RpcServer 線程模型

NameNode啟動過程:

Hadoop的Server及其線程模型分析

線程模型

Listener 1個:

  1. 監聽并接受來自客戶端的連接.將新建連接放入pendingConnections.
  2. 清理空閑連接.
  3. 喚醒Reader.

Reader N個 : 從pendingConnections中獲取連接,讀取數據,從RpcRequest構造Call,并放入callQueue.

Handler N 個:

  1. 從callQueue獲取客戶端調用call,并執行.
  2. 調用Responder,將結果加入responseQueue的尾部.這里會調用一次發送.如果數據未發送完,注冊WRITE事件到selector.并喚醒Responder.

Responder 1個:

  1. 從responseQueue中按照FIFO順序發送數據.
  2. 處理selectorselect出的數據.
  3. 掃描callQueue,并丟棄過期的Call.

Hadoop的Server及其線程模型分析

終.

 本文由用戶 er74 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!