記一次有趣的 Netty 源碼問題
背景
起因是一個朋友問我的一個關于 ServerBootstrap 啟動的問題 .
他的問題我復述一下:
ServerBootstrap 的綁定流程如下:
ServerBootstrap.bind ->
AbstractBootstrap.bind ->
AbstractBootstrap.doBind ->
AbstractBootstrap.initAndRegister ->
AbstractChannel#AbstractUnsafe.register ->
eventLoop.execute( () -> AbstractUnsafe.register0)
doBind0() ->
channel.eventLoop().execute( () -> channel.bind) ->
AbstractUnsafe.bind
在 AbstractUnsafe.register0 中可能會調用 pipeline.fireChannelActive() , 即:
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
doRegister();
...
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
...
}
}
并且在 AbstractUnsafe.bind 中也會有 pipeline.fireChannelActive() 的調用, 即:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
...
}
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
...
}</code></pre>
那么有沒有可能造成了兩次的 pipeline.fireChannelActive() 調用?
我的回答是不會. 為什么呢? 對于直接想知道答案的朋友可以直接閱讀到最后面的 回答 與 總結 兩節..
下面我們就來根據代碼詳細分析一下.
分析
首先, 根據我們上面所列出的調用流程, 會有 AbstractBootstrap.doBind 的調用, 它的代碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 步驟1
final ChannelFuture regFuture = initAndRegister();
...
// 步驟2
if (regFuture.isDone()) {
...
doBind0(regFuture, channel, localAddress, promise);
...
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...
doBind0(regFuture, channel, localAddress, promise);
}
});
}
}
首先在 doBind 中, 執行步驟1, 即調用 initAndRegister 方法, 這個方法會最終調用到 AbstractChannel#AbstractUnsafe.register . 而在 AbstractChannel#AbstractUnsafe.register 中, 會通過 eventLoop.execute 的形式將 AbstractUnsafe.register0 的調用提交到任務隊列中(即提交到 eventLoop 線程中, 而當前代碼所在的線程是 main 線程), 即:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 當前線程是主線程, 因此這個判斷是 false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
// register0 在 eventLoop 線程中執行.
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
接著 AbstractBootstrap.initAndRegister 返回, 回到 AbstractBootstrap.doBind 中, 于是執行到步驟2. 注意, 因為 AbstractUnsafe.register0 是在 eventLoop 中執行的, 因此有可能主線程執行到步驟2 時, AbstractUnsafe.register0 已經執行完畢了, 此時必然有 regFuture.isDone() == true ; 但也有可能 AbstractUnsafe.register0 沒有來得及執行, 因此此時 regFuture.isDone() == false . 所以上面的步驟2 考慮到了這兩種情況, 因此分別針對這兩種情況做了區分, 即:
// 步驟2
if (regFuture.isDone()) {
...
doBind0(regFuture, channel, localAddress, promise);
...
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
...
doBind0(regFuture, channel, localAddress, promise);
}
});
}
一般情況下, regFuture.isDone() 為 false, 因為綁定操作是比較費時的, 因此很大幾率會執行到 else 分支, 并且 if 分支和 else 分支從結果上說沒有不同, 而且 if 分支邏輯還更簡單一些, 因此我們以 else 分支來分析吧. 在 else 分支中, 會為 regFuture 設置一個回調監聽器. regFuture 是一個 ChannelFuture , 而 ChannelFuture 代表了一個 Channel 的異步 IO 的操作結果, 因此這里 regFuture 代表了 Channel 注冊(register) 的這個異步 IO 的操作結果.
Netty 這里之所以要為 regFuture 設置一個回調監聽器, 是為了保證 register 和 bind 的時序上的正確性: Channel 的注冊必須要發生在 Channel 的綁定之前 .
(關于時序的正確性的問題, 我們在后面有證明)
接下來我們來看一下 AbstractUnsafe.register0 方法:
private void register0(ChannelPromise promise) {
try {
....
// neverRegistered 一開始是 true, 因此 firstRegistration == true
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// firstRegistration == true, 而 isActive() == false,
// 因此不會執行到 pipeline.fireChannelActive()
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
注意, 我需要再強調一下, 這里 AbstractUnsafe.register0 是在 eventLoop 中執行的.
AbstractUnsafe.register0 中會調用 doRegister() 注冊 NioServerSocketChannel , 然后調用 safeSetSuccess() 設置 promise 的狀態為成功. 而這個 promise 變量是什么呢? 我將 AbstractBootstrap.doBind 的調用鏈寫詳細一些:
AbstractBootstrap.doBind ->
AbstractBootstrap.initAndRegister ->
MultithreadEventLoopGroup.register ->
SingleThreadEventLoop.register ->
AbstractChannel#AbstractUnsafe.register ->
eventLoop.execute( () -> AbstractUnsafe.register0)
在 SingleThreadEventLoop.register 中會實例化一個 DefaultChannelPromise, 即:
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
接著調用重載的 SingleThreadEventLoop.register 方法:
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}</code></pre>
我們看到, 實例化的 DefaultChannelPromise 最終會以方法返回值的方式返回到調用方, 即返回到 AbstractBootstrap.doBind 中:
final ChannelFuture regFuture = initAndRegister();
因此我們這里有一個共識: regFuture 是一個在 SingleThreadEventLoop.register 中實例化的 DefaultChannelPromise 對象.
再回到 SingleThreadEventLoop.register 中, 在這里會調用 channel.unsafe().register(this, promise) , 將 promise 對象傳遞到 AbstractChannel#AbstractUnsafe.register 中, 因此在 AbstractUnsafe.register0 中的 promise 就是 AbstractBootstrap.doBind 中的 regFuture .
promise == regFuture 很關鍵.
既然我們已經確定了 promise 的身份, 那么調用的 safeSetSuccess(promise); 我們也知道是干嘛的了. safeSetSuccess 方法設置一個 Promise 的狀態為 成功態 , 而 Promise 的 成功態 是最終狀態, 即此時 promise.isDone() == true . 那么 設置 promise 為 成功態 后, 會發生什么呢?
還記得不 promise == regFuture , 而我們在 AbstractBootstrap.doBind 的 else 分支中設置了一個回調監聽器:
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
因此當 safeSetSuccess(promise); 調用時, 根據 Netty 的 Promise/Future 機制, 會觸發上面的 operationComplete 回調, 在回調中調用 doBind0 方法:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
注意到, 有一個關鍵的地方, 代碼中將 **channel.bind** 的調用放到了 eventLoop 中執行 . doBind0 返回后, 代碼繼續執行 AbstractUnsafe.register0 方法的剩余部分代碼, 即:
當 AbstractUnsafe.register0 方法執行完畢后, 才執行到 channel.bind 方法.
而 channel.bind 方法最終會調用到 AbstractChannel#AbstractUnsafe.bind 方法, 源碼如下:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
logger.info("---wasActive: {}---", wasActive);
try {
// 調用 NioServerSocketChannel.bind 方法,
// 將底層的 Java NIO SocketChannel 綁定到指定的端口.
// 當 SocketChannel 綁定到端口后, isActive() 才為真.
doBind(localAddress);
} catch (Throwable t) {
...
}
boolean activeNow = isActive();
logger.info("---activeNow: {}---", activeNow);
// 這里 wasActive == false
// isActive() == true
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}</code></pre>
上面的代碼中, 調用了 doBind(localAddress) 將底層的 Java NIO SocketChannel 綁定到指定的端口. 并且 當 SocketChannel 綁定到端口后, isActive() 才為真.
因此我們知道, 如果 SocketChannel 第一次綁定時, 在調用 doBind 前, wasActive == false == isActive() , 而當調用了 doBind 后, isActive() == true , 因此第一次綁定端口時, if 判斷成立, 會調用 pipeline.fireChannelActive() .
關于 Channel 注冊與綁定的時序問題
我們在前的分析中, 直接認定了 Channel 注冊 在 Channel 的綁定 之前完成, 那么依據是什么呢?
其實所有的關鍵在于 EventLoop 的任務隊列機制 .
不要閑我啰嗦哦. 我們需要繼續回到 AbstractUnsafe.register0 的調用中(再次強調一下, 在 eventLoop 線程中執行AbstractUnsafe.register0), 這個方法我們已經分析了, 它會調用 safeSetSuccess(promise), 并由 Netty 的 Promise/Future 機制, 導致了AbstractBootstrap.doBind 中的 regFuture 所設置的回調監聽器的 operationComplete 方法調用, 而 operationComplete 中調用了 AbstractBootstrap.doBind0 :
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在 doBind0 中, 根據 EventLoop 的任務隊列機制 , 會使用 eventLoop().execute 將 channel.bind 封裝為一個 Task, 放到 eventLoop 的 taskQueue 中.
如下用一幅圖表示上面的過程:
(圖片插入不了, 暈)
而當 channel.bind 被調度時, AbstractUnsafe.register0 早就已經調用結束了.
因此由于 EventLoop 的任務隊列機制, 我們知道, 在執行 AbstractUnsafe.register0 時, 是在 EventLoop 線程中的, 而 channel.bind 的調用是以 task 的形式添加到 taskQueue 隊列的末尾, 因此必然是有 EventLoop 線程先執行完 AbstractUnsafe.register0 方法后, 才有機會從 taskQueue 中取出一個 task 來執行, 因此這個機制從根本上保證了 Channel 注冊發生在綁定 之前.
回答
你的疑惑是, AbstractChannel#AbstractUnsafe.register0 中, 可能會調用 pipeline.fireChannelActive() , 即:
private void register0(ChannelPromise promise) {
try {
...
boolean firstRegistration = neverRegistered;
doRegister();
...
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
...
}
}
并且在 AbstractChannel#AbstractUnsafe.bind 中也可能會調用到 pipeline.fireChannelActive() , 即:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
...
}
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
...
}</code></pre>
我覺得是 不會 . 因為根據上面我們分析的結果可知, Netty 的 Promise/Future 與 EventLoop 的任務隊列機制保證了 NioServerSocketChannel 的注冊和 NioServerSocketChannel 的綁定的時序: Channel 的注冊必須要發生在 Channel 的綁定之前 , 而當一個 NioServerSocketChannel 沒有綁定到具體的端口前, 它是 不活躍的(Inactive) , 因此在 register0 中, if (firstRegistration && isActive()) 就不成立, 進而就不會執行到 pipeline.fireChannelActive() 了.
而執行完注冊操作后, 在 AbstractChannel#AbstractUnsafe.bind 才會調用 pipeline.fireChannelActive() , 因此最終只有一次 fireChannelActive 調用.
總結
有兩點需要注意的:
-
isActive() == true 成立的關鍵是此 NioServerSocketChannel 已經綁定到端口上了.
-
由 Promise/Future 與 EventLoop 機制, 導致了 Channel 的注冊 發生在 Channel 的綁定 之前, 因此在 AbstractChannel#AbstractUnsafe.register0 中的 isActive() == false, if 判斷不成立, 最終就是 register0 中的 pipeline.fireChannelActive() 不會被調用.
來自:https://segmentfault.com/a/1190000007422683