Netty 源碼分析之服務端啟動全解析
background
netty 是一個異步事件驅動的網絡通信層框架,其官方文檔的解釋為
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
我們在新美大消息推送系統sailfish(日均推送消息量50億),新美大移動端代理優化系統shark(日均吞吐量30億)中,均選擇了netty作為底層網絡通信框架。
既然兩大如此重要的系統底層都使用到了netty,所以必然要對netty的機制,甚至源碼了若指掌,于是,便催生了netty源碼系列文章。后面,我會通過一系列的主題把我從netty源碼里所學到的毫無保留地介紹給你,源碼基于4.1.6.Final
why netty
netty底層基于jdk的NIO,我們為什么不直接基于jdk的nio或者其他nio框架?下面是我總結出來的原因
1.使用jdk自帶的nio需要了解太多的概念,編程復雜
2.netty底層IO模型隨意切換,而這一切只需要做微小的改動
3.netty自帶的拆包解包,異常檢測等機制讓你從nio的繁重細節中脫離出來,讓你只需要關心業務邏輯
4.netty解決了jdk的很多包括空輪訓在內的bug
5.netty底層對線程,selector做了很多細小的優化,精心設計的reactor線程做到非常高效的并發處理
6.自帶各種協議棧讓你處理任何一種通用協議都幾乎不用親自動手
7.netty社區活躍,遇到問題隨時郵件列表或者issue
8.netty已經歷各大rpc框架,消息中間件,分布式通信中間件線上的廣泛驗證,健壯性無比強大
dive into netty
了解了這么多,今天我們就從一個例子出來,開始我們的netty源碼之旅。
本篇主要講述的是netty是如何綁定端口,啟動服務。啟動服務的過程中,你將會了解到netty各大核心組件,我先不會細講這些組件,而是會告訴你各大組件是怎么串起來組成netty的核心
example
下面是一個非常簡單的服務端啟動代碼
public final class SimpleServer {public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class SimpleServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded"); } }
}</pre>
簡單的幾行代碼就能開啟一個服務端,端口綁定在8888,使用nio模式,下面講下每一個步驟的處理細節
EventLoopGroup 已經在我的其他文章中詳細剖析過,說白了,就是一個死循環,不停地檢測IO事件,處理IO事件,執行任務
ServerBootstrap 是服務端的一個啟動輔助類,通過給他設置一系列參數來綁定端口啟動服務
group(bossGroup, workerGroup) 我們需要兩種類型的人干活,一個是老板,一個是工人,老板負責從外面接活,接到的活分配給工人干,放到這里, bossGroup 的作用就是不斷地accept到新的連接,將新的連接丟給 workerGroup 來處理
.channel(NioServerSocketChannel.class) 表示服務端啟動的是nio相關的channel,channel在netty里面是一大核心概念,可以理解為一條channel就是一個連接或者一個服務端bind動作,后面會細說
.handler(new SimpleServerHandler() 表示服務器啟動過程中,需要經過哪些流程,這里 SimpleServerHandler 最終的頂層接口為 ChannelHander ,是netty的一大核心概念,表示數據流經過的處理器,可以理解為流水線上的每一道關卡
childHandler(new ChannelInitializer<SocketChannel>)... 表示一條新的連接進來之后,該怎么處理,也就是上面所說的,老板如何給工人配活
ChannelFuture f = b.bind(8888).sync(); 這里就是真正的啟動過程了,綁定8888端口,等待服務器啟動完畢,才會進入下行代碼
f.channel().closeFuture().sync(); 等待服務端關閉socket
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); 關閉兩組死循環
上述代碼可以很輕松地再本地跑起來,最終控制臺的輸出為:
handlerAdded channelRegistered channelActive關于為什么會順序輸出這些,深入分析之后其實很easy
深入細節
ServerBootstrap 一系列的參數配置其實沒啥好講的,無非就是使用 method chaining 的方式將啟動服務器需要的參數保存到filed。我們的重點落入到下面這段代碼
b.bind(8888).sync();這里說一句:我們剛開始看源碼,對細節沒那么清楚的情況下可以借助IDE的debug功能,step by step,one step one test或者二分test的方式,來確定哪行代碼是最終啟動服務的入口,在這里,我們已經確定了bind方法是入口,我們跟進去,分析
public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }通過端口號創建一個 InetSocketAddress ,然后繼續bind
public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }validate() 驗證服務啟動需要的必要參數,然后調用 doBind()
private ChannelFuture doBind(final SocketAddress localAddress) { //... final ChannelFuture regFuture = initAndRegister(); //... final Channel channel = regFuture.channel(); //... doBind0(regFuture, channel, localAddress, promise); //... return promise; }這里,我去掉了細枝末節,讓我們專注于核心方法,其實就兩大核心一個是 initAndRegister() ,以及 doBind0()
其實,從方法名上面我們已經可以略窺一二,init->初始化,register->注冊,那么要注冊到什么上面去了,聯系到nio里面輪訓器的注冊,可能是把某個東西初始化好了之后注冊到selector上面去,最后bind,像是在本地綁定端口號,帶著這些猜測,我們深入下去
initAndRegister()
final ChannelFuture initAndRegister() { Channel channel = null; // ... channel = channelFactory.newChannel(); //... init(channel); //... ChannelFuture regFuture = config().group().register(channel); //... return regFuture; }我們還是專注于核心代碼,拋開邊角料,我們看到 initAndRegister() 做了幾件事情
1.new一個channel
2.init這個channel
3.將這個channel register到某個對象
我們逐步分析這三件事情
1.new一個channel
我們首先要搞懂channel的定義,netty官方對channel的描述如下
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
這里的channel,由于是在服務啟動的時候創建,我們可以和普通Socket編程中的ServerSocket對應上,表示服務端綁定的時候經過的一條流水線
我們發現這條channel是通過一個 channelFactory new出來的, channelFactory 的接口很簡單
public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> { /*** Creates a new channel. */ @Override T newChannel();
}</pre>
就一個方法,我們查看channelFactory被賦值的地方
AbstractBootstrap.java
public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); }this.channelFactory = channelFactory; return (B) this;
}</pre>
在這里被賦值,我們層層回溯,查看該函數被調用的地方,發現最終是在這個函數中,ChannelFactory被new出
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }這里,我們的demo程序調用 channel(channelClass) 方法的時候,將 channelClass 作為 ReflectiveChannelFactory 的構造函數創建出一個 ReflectiveChannelFactory
demo端的代碼如下:
.channel(NioServerSocketChannel.class);然后回到本節最開始
channelFactory.newChannel();我們就可以推斷出,最終是調用到 ReflectiveChannelFactory.newChannel() 方法,跟進
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
}</pre>
看到 clazz.newInstance(); ,我們明白了,原來是通過反射的方式來創建一個對象,而這個class就是我們在 ServerBootstrap 中傳入的 NioServerSocketChannel.class
結果,繞了一圈,最終創建channel相當于調用默認構造函數new出一個 NioServerSocketChannel 對象
這里提一下,讀源碼細節,有兩種讀的方式,一種是回溯,比如用到某個對象的時候可以逐層追溯,一定會找到該對象的最開始被創建的代碼區塊,還有一種方式就是自頂向下,逐層分析,一般用在分析某個具體的方法,庖丁解牛,最后拼接出完整的流程
接下來我們就可以將重心放到 NioServerSocketChannel 的默認構造函數
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }private static ServerSocketChannel newSocket(SelectorProvider provider) { //... return provider.openServerSocketChannel(); }通過 SelectorProvider.openServerSocketChannel() 創建一條server端channel,然后進入到以下方法
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }這里第一行代碼就跑到父類里面去了,第二行,new出來一個 NioServerSocketChannelConfig ,其頂層接口為 ChannelConfig ,netty官方的描述如下
A set of configuration properties of a Channel.
基本可以判定, ChannelConfig 也是netty里面的一大核心模塊,初次看源碼,看到這里,我們大可不必深挖這個對象,而是在用到的時候再回來深究,只要記住,這個對象在創建 NioServerSocketChannel 對象的時候被創建即可
我們繼續追蹤到 NioServerSocketChannel 的父類
AbstractNioMessageChannel.java
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }繼續往上追
AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; //... ch.configureBlocking(false); //... }這里,簡單地將前面 provider.openServerSocketChannel(); 創建出來的 ServerSocketChannel 保存到成員變量,然后調用 ch.configureBlocking(false); 設置該channel為非阻塞模式,標準的jdk nio編程的玩法
這里的 readInterestOp 即前面層層傳入的 SelectionKey.OP_ACCEPT ,接下來重點分析 super(parent); (這里的parent其實是null,由前面寫死傳入)
AbstractChannel.java
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }到了這里,又new出來三大組件,賦值到成員變量,分別為
id = newId(); protected ChannelId newId() { return DefaultChannelId.newInstance(); }id是netty中每條channel的唯一標識,這里不細展開,接著
unsafe = newUnsafe(); protected abstract AbstractUnsafe newUnsafe();查看Unsafe的定義
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread
成功捕捉netty的又一大組件,我們可以先不用管TA是干嘛的,只需要知道這里的 newUnsafe 方法最終屬于類 NioServerSocketChannel 中
最后
pipeline = newChannelPipeline();protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;
}</pre>
初次看這段代碼,可能并不知道 DefaultChannelPipeline 是干嘛用的,我們仍然使用上面的方式,查看頂層接口 ChannelPipeline 的定義
A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel
從該類的文檔中可以看出,該接口基本上又是netty的一大核心模塊
到了這里,我們總算把一個服務端channel創建完畢了,將這些細節串起來的時候,我們順帶提取出netty的幾大基本組件,先總結如下
- Channel
- ChannelConfig
- ChannelId
- Unsafe
- Pipeline
- ChannelHander
初次看代碼的時候,我們的目標是跟到服務器啟動的那一行代碼,我們先把以上這幾個組件記下來,等代碼跟完,我們就可以自頂向下,逐層分析,我會放到后面源碼系列中去深入到每個組件
總結一下,用戶調用方法 Bootstrap.bind(port) 第一步就是通過反射的方式new一個 NioServerSocketChannel 對象,并且在new的過程中創建了一系列的核心組件,僅此而已,并無他,真正的啟動我們還需要繼續跟
2.init這個channel
到了這里,你最好跳到文章最開始的地方回憶一下,第一步newChannel完畢,這里就對這個channel做init,init方法具體干啥,我們深入
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); }final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
}</pre>
初次看到這個方法,可能會覺得,哇塞,老長了,這可這么看?還記得我們前面所說的嗎,庖丁解牛,逐步拆解,最后歸一,下面是我的拆解步驟
1.設置option和attr
final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); }final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } }</pre>
通過這里我們可以看到,這里先調用 options0() 以及 attrs0() ,然后將得到的options和attrs注入到channelConfig或者channel中,關于option和attr是干嘛用的,其實你現在不用了解得那么深入,只需要查看最頂層接口 ChannelOption 以及查看一下channel的具體繼承關系,就可以了解,我把這兩個也放到后面的源碼分析系列再講
2.設置新接入channel的option和attr
final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); }這里,和上面類似,只不過不是設置當前channel的這兩個屬性,而是對應到新進來連接對應的channel,由于我們這篇文章只關心到server如何啟動,接入連接放到下一篇文章中詳細剖析
3.加入新連接處理器
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });到了最后一步, p.addLast() 向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor ,從名字上就可以看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件循環器,我們先不做過多分析
來,我們總結一下,我們發現其實init也沒有啟動服務,只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器,用來專門接受新連接,我們還得繼續往下跟
3.將這個channel register到某個對象
這一步,我們是分析如下方法
ChannelFuture regFuture = config().group().register(channel);調用到 NioEventLoop 中的 register
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }好了,到了這一步,還記得這里的 unsafe() 返回的應該是什么對象嗎?不記得的話可以看下前面關于unsafe的描述,或者最快的方式就是debug到這邊,跟到register方法里面,看看是哪種類型的unsafe
我們跟進去之后發現是
AbstractUnsafe.java
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // ... AbstractChannel.this.eventLoop = eventLoop; // ... register0(promise); }這里我們依然只需要focus重點,先將EventLoop事件循環器綁定到該NioServerSocketChannel上,然后調用 register0()
private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }這一段其實也很清晰,先調用 doRegister(); ,具體干啥待會再講,然后調用 invokeHandlerAddedIfNeeded() , 于是乎,控制臺第一行打印出來的就是
handlerAdded關于最終是如何調用到的,我們后面詳細剖析pipeline的時候再講
然后調用 pipeline.fireChannelRegistered(); 調用之后,控制臺的顯示為
handlerAdded channelRegistered繼續往下跟
if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } }讀到這,你可能會想當然地以為,控制臺最后一行
pipeline.fireChannelActive();由這行代碼輸出,我們不妨先看一下 isActive() 方法
@Override public boolean isActive() { return javaChannel().socket().isBound(); }最終調用到jdk中
ServerSocket.java
/** * Returns the binding state of the ServerSocket. * * @return true if the ServerSocket succesfuly bound to an address * @since 1.4 */ public boolean isBound() { // Before 1.3 ServerSockets were always bound during creation return bound || oldImpl; }這里 isBound() 返回false,但是從目前我們跟下來的流程看,我們并沒有將一個ServerSocket綁定到一個address,所以 isActive() 返回false,我們沒有成功進入到 pipeline.fireChannelActive(); 方法,那么最后一行到底是誰輸出的呢,我們有點抓狂,其實,只要熟練運用IDE,要定位函數調用棧,無比簡單
下面是我用intellij定位函數調用的具體方法
Intellij函數調用定位
我們先在最終輸出文字的這一行代碼處打一個斷點,然后debug,運行到這一行,intellij自動給我們拉起了調用棧,我們唯一要做的事,就是移動方向鍵,就能看到函數的完整的調用鏈
如果你看到方法的最近的發起端是一個線程Runnable的run方法,那么就在提交Runnable對象方法的地方打一個斷點,去掉其他斷點,重新debug,比如我們首次debug發現調用棧中的最近的一個Runnable如下
if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); }我們停在了這一行 pipeline.fireChannelActive(); , 我們想看最初始的調用,就得跳出來,斷點打到 if (!wasActive && isActive()) ,因為netty里面很多任務執行都是異步線程即reactor線程調用的(具體可以看reactor線程三部曲中的最后一曲),如果我們要查看最先發起的方法調用,我們必須得查看Runnable被提交的地方,逐次遞歸下去,就能找到那行"消失的代碼"
最終,通過這種方式,終于找到了 pipeline.fireChannelActive(); 的發起調用的代碼,不巧,剛好就是下面的 doBind0() 方法
doBind0()
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }我們發現,在調用 doBind0(...) 方法的時候,是通過包裝一個Runnable進行異步化的,關于異步化task,可以看下我前面的文章, netty源碼分析之揭開reactor線程的面紗(三)
好,接下來我們進入到 channel.bind() 方法
AbstractChannel.java
@Override public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); }發現是調用pipeline的bind方法
@Override public final ChannelFuture bind(SocketAddress localAddress) { return tail.bind(localAddress); }相信你對tail是什么不是很了解,可以翻到最開始,tail在創建pipeline的時候出現過,關于pipeline和tail對應的類,我后面源碼系列會詳細解說,這里,你要想知道接下來代碼的走向,唯一一個比較好的方式就是debug 單步進入,篇幅原因,我就不詳細展開
最后,我們來到了如下區域
HeadContext.java
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }這里的unsafe就是前面提到的 AbstractUnsafe , 準確點,應該是 NioMessageUnsafe
我們進入到它的bind方法
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { // ... boolean wasActive = isActive(); // ... doBind(localAddress); if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }顯然按照正常流程,我們前面已經分析到 isActive(); 方法返回false,進入到 doBind() 之后,如果channel被激活了,就發起 pipeline.fireChannelActive(); 調用,最終調用到用戶方法,在控制臺打印出了最后一行,所以到了這里,你應該清楚為什么最終會在控制臺按順序打印出那三行字了吧
doBind() 方法也很簡單
protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { //noinspection Since15 javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }最終調到了jdk里面的bind方法,這行代碼過后,正常情況下,就真正進行了端口的綁定。
另外,通過自頂向下的方式分析,在調用 pipeline.fireChannelActive(); 方法的時候,會調用到如下方法
HeadContext.java
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }進入 readIfIsAutoRead
private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } }分析 isAutoRead 方法
private volatile int autoRead = 1; public boolean isAutoRead() { return autoRead == 1; }由此可見, isAutoRead 方法默認返回true,于是進入到以下方法
public Channel read() { pipeline.read(); return this; }最終調用到
AbstractNioUnsafe.java
protected void doBeginRead() throws Exception { final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }這里的 this.selectionKey 就是我們在前面register步驟返回的對象,前面我們在register的時候,注冊測ops是0
回憶一下注冊
AbstractNioChannel
selectionKey = javaChannel().register(eventLoop().selector, 0, this)這里相當于把注冊過的ops取出來,通過了if條件,然后調用
selectionKey.interestOps(interestOps | readInterestOp);而這里的 readInterestOp 就是前面newChannel的時候傳入的 SelectionKey.OP_ACCEPT ,又是標準的jdk nio的玩法,到此,你需要了解的細節基本已經差不多了,就這樣結束吧!
summary
最后,我們來做下總結,netty啟動一個服務所經過的流程
1.設置啟動類參數,最重要的就是設置channel
2.創建server對應的channel,創建各大組件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
3.初始化server對應的channel,設置一些attr,option,以及設置子channel的attr,option,給server的channel添加新channel接入器,并出發addHandler,register等事件
4.調用到jdk底層做端口綁定,并觸發active事件,active觸發的時候,真正做服務費端口綁定
另外,文章中閱讀源碼的思路詳細或許也可以給你帶來一些幫助,完。
來自:http://www.jianshu.com/p/c5068caab217