Netty 實現 WebSocket 聊天功能
現在,我們要坐下修改,加入 WebSocket 的支持,使它可以在瀏覽器里進行文本聊天。
準備
<!– more –>
WebSocket
WebSocket 通過“ Upgrade handshake (升級握手)”從標準的 HTTP 或HTTPS 協議轉為 WebSocket。因此,使用 WebSocket 的應用程序將始終以 HTTP/S 開始,然后進行升級。在什么時候發生這種情況取決于具體的應用;它可以是在啟動時,或當一個特定的 URL 被請求時。
在我們的應用中,當 URL 請求以“/ws”結束時,我們才升級協議為WebSocket。否則,服務器將使用基本的 HTTP/S。一旦升級連接將使用的WebSocket 傳輸所有數據。
整個服務器邏輯如下:
1.客戶端/用戶連接到服務器并加入聊天
2.HTTP 請求頁面或 WebSocket 升級握手
3.服務器處理所有客戶端/用戶
4.響應 URI “/”的請求,轉到默認 html 頁面
5.如果訪問的是 URI“/ws” ,處理 WebSocket 升級握手
6.升級握手完成后 ,通過 WebSocket 發送聊天消息
服務端
讓我們從處理 HTTP 請求的實現開始。
處理 HTTP 請求
HttpRequestHandler.java
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "WebsocketChatClient.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain()); //2
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx); //3
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) { //5
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response); //6
if (ctx.pipeline().get(SslHandler.class) == null) { //7
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE); //9
}
file.close();
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"異常");
// 當出現異常就關閉連接
cause.printStackTrace();
ctx.close();
}
}
1.擴展 SimpleChannelInboundHandler 用于處理 FullHttpRequest信息
2.如果請求是 WebSocket 升級,遞增引用計數器(保留)并且將它傳遞給在 ChannelPipeline 中的下個 ChannelInboundHandler
3.處理符合 HTTP 1.1的 “100 Continue” 請求
4.讀取默認的 WebsocketChatClient.html 頁面
5.判斷 keepalive 是否在請求頭里面
6.寫 HttpResponse 到客戶端
7.寫 index.html 到客戶端,判斷 SslHandler 是否在 ChannelPipeline 來決定是使用 DefaultFileRegion 還是 ChunkedNioFile
8.寫并刷新 LastHttpContent 到客戶端,標記響應完成
9.如果 keepalive 沒有要求,當寫完成時,關閉 Channel
HttpRequestHandler 做了下面幾件事,
- 如果該 HTTP 請求被發送到URI “/ws”,調用 FullHttpRequest 上的 retain(),并通過調用 fireChannelRead(msg) 轉發到下一個 ChannelInboundHandler。retain() 是必要的,因為 channelRead() 完成后,它會調用 FullHttpRequest 上的 release() 來釋放其資源。 (請參考我們先前的 SimpleChannelInboundHandler 在第6章中討論)
- 如果客戶端發送的 HTTP 1.1 頭是“Expect: 100-continue” ,將發送“100 Continue”的響應。
- 在 頭被設置后,寫一個 HttpResponse 返回給客戶端。注意,這是不是 FullHttpResponse,唯一的反應的第一部分。此外,我們不使用 writeAndFlush() 在這里 – 這個是在最后完成。
- 如果沒有加密也不壓縮,要達到最大的效率可以是通過存儲 index.html 的內容在一個 DefaultFileRegion 實現。這將利用零拷貝來執行傳輸。出于這個原因,我們檢查,看看是否有一個 SslHandler 在 ChannelPipeline 中。另外,我們使用 ChunkedNioFile。
- 寫 LastHttpContent 來標記響應的結束,并終止它
- 如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 對象的最后寫入,并關閉連接。注意,這里我們調用 writeAndFlush() 來刷新所有以前寫的信息。
處理 WebSocket frame
WebSockets 在“幀”里面來發送數據,其中每一個都代表了一個消息的一部分。一個完整的消息可以利用了多個幀。 WebSocket “Request for Comments” (RFC) 定義了六中不同的 frame; Netty 給他們每個都提供了一個 POJO 實現 ,而我們的程序只需要使用下面4個幀類型:
- CloseWebSocketFrame
- PingWebSocketFrame
- PongWebSocketFrame
- TextWebSocketFrame
在這里我們只需要顯示處理 TextWebSocketFrame,其他的會由 WebSocketServerProtocolHandler 自動處理。
下面代碼展示了 ChannelInboundHandler 處理 TextWebSocketFrame,同時也將跟蹤在 ChannelGroup 中所有活動的 WebSocket 連接
TextWebSocketFrameHandler.java
public class TextWebSocketFrameHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception { // (1)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
}
channels.add(ctx.channel());
System.out.println("Client:"+incoming.remoteAddress() +"加入");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開"));
}
System.out.println("Client:"+incoming.remoteAddress() +"離開");
channels.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"在線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("Client:"+incoming.remoteAddress()+"異常");
// 當出現異常就關閉連接
cause.printStackTrace();
ctx.close();
}
}
1.TextWebSocketFrameHandler 繼承自 SimpleChannelInboundHandler ,這個類實現了 ChannelInboundHandler 接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實現接口方法。
2.覆蓋了 handlerAdded() 事件處理方法。每當從服務端收到新的客戶端連接時,客戶端的 Channel 存入 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel
3.覆蓋了 handlerRemoved() 事件處理方法。每當從服務端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel
4.覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重命名為messageReceived()
5.覆蓋了 channelActive() 事件處理方法。服務端監聽到客戶端活動
6.覆蓋了 channelInactive() 事件處理方法。服務端監聽到客戶端不活動
7.exceptionCaught() 事件處理方法是當出現 Throwable 對象才會被調用,即當 Netty 由于 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來并且把關聯的 channel 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現,比如你可能想在關閉連接之前發送一個錯誤碼的響應消息。
上面顯示了 TextWebSocketFrameHandler 僅作了幾件事:
- 當WebSocket 與新客戶端已成功握手完成,通過寫入信息到 ChannelGroup 中的 Channel 來通知所有連接的客戶端,然后添加新 Channel 到 ChannelGroup
- 如果接收到 TextWebSocketFrame,調用 retain() ,并將其寫、刷新到 ChannelGroup,使所有連接的 WebSocket Channel 都能接收到它。和以前一樣,retain() 是必需的,因為當 channelRead0()返回時,TextWebSocketFrame 的引用計數將遞減。由于所有操作都是異步的,writeAndFlush() 可能會在以后完成,我們不希望它來訪問無效的引用。
由于 Netty 處理了其余大部分功能,唯一剩下的我們現在要做的是初始化 ChannelPipeline 給每一個創建的新的 Channel 。做到這一點,我們需要一個ChannelInitializer
WebsocketChatServerInitializer.java
public class WebsocketChatServerInitializer extends
ChannelInitializer<SocketChannel> { //1
@Override
public void initChannel(SocketChannel ch) throws Exception {//2
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64*1024));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
1.擴展 ChannelInitializer
2.添加 ChannelHandler 到 ChannelPipeline
initChannel() 方法設置 ChannelPipeline 中所有新注冊的 Channel,安裝所有需要的 ChannelHandler。
WebsocketChatServer.java
編寫一個 main() 方法來啟動服務端。
public class WebsocketChatServer {
private int port;
public WebsocketChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new WebsocketChatServerInitializer()) //(4)
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
System.out.println("WebsocketChatServer 啟動了");
// 綁定端口,開始接收進來的連接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服務器 socket 關閉 。
// 在這個例子中,這不會發生,但你可以優雅地關閉你的服務器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("WebsocketChatServer 關閉了");
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new WebsocketChatServer(port).run();
}
}
1. NioEventLoopGroup 是用來處理I/O操作的多線程事件循環器,Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經被使用,如何映射到已經創建的 Channel 上都需要依賴于 EventLoopGroup 的實現,并且可以通過構造函數來配置他們的關系。
2. ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你并不需要這樣做。
3.這里我們指定使用 NioServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連接。
4.這里的事件處理類經常會被用來處理一個最近的已經接收的 Channel。SimpleChatServerInitializer 繼承自 ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應的 ChannelPipeline 來實現你的網絡程序。當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
5.你可以設置這里指定的 Channel 實現的配置參數。我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數選項比如tcpNoDelay 和 keepAlive。請參考 ChannelOption 和詳細的 ChannelConfig 實現的接口文檔以此可以對ChannelOption 的有一個大概的認識。
6.option() 是提供給 NioServerSocketChannel 用來接收進來的連接。childOption() 是提供給由父管道 ServerChannel 接收到的連接,在這個例子中也是 NioServerSocketChannel。
7.我們繼續,剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網卡上的 8080 端口。當然現在你可以多次調用 bind() 方法(基于不同綁定地址)。
恭喜!你已經完成了基于 Netty 聊天服務端程序。
客戶端
在程序的 resources 目錄下,我們創建一個 WebsocketChatClient.html 頁面來作為客戶端
WebsocketChatClient.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8080/ws");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "連接開啟!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "連接被關閉";
};
} else {
alert("你的瀏覽器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("連接沒有開啟.");
}
}
</script>
<form onsubmit="return false;">
<h3>WebSocket 聊天室:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="Welcome to www.waylau.com">
<input type="button" value="發送消息" onclick="send(this.form.message.value)">
<input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天記錄">
</form>
<br>
<br>
<a >更多例子請訪問 www.waylau.com</a>
</body>
</html>
邏輯比較簡單,不累述。
運行效果
先運行 WebsocketChatServer,再打開多個瀏覽器頁面實現多個 客戶端訪問 http://localhost:8080
參考
-
Netty 4.x 用戶指南 https://github.com/waylau/netty-4-user-guide
-
Netty 實戰(精髓) https://github.com/waylau/essential-netty-in-action
來自:http://www.importnew.com/21561.html