如何為可擴展系統進行Socket編程
從簡單I/O到異步非阻塞channel的Java Socket模型演變之旅
上世紀九十年代后期,我在一家在線視頻游戲工資工作,在哪里我主要的工作就是編寫Unix Unix Berkley Socket和Windows WinSock代碼。我的任務是確保視頻游戲客戶端和一個游戲服務器通信。很幸運有這樣的機會寫一些Java Socket代碼,我對Java流式網絡編程和簡潔明了的API著迷。這一點都不讓人驚訝,Java最初就是設計促進智能設備之間的通信,這一點很好的轉移到了桌面應用和服務器應用。
1996年,JavaWorld刊登了Qusay H. Mahmoud的文章”Sockets programming in Java: A tutorial“。文章概述了Java的Socket編程模型。從那以后的18年,這個模型少有變化。這篇文章依然是網絡系統Java socket編程的入門經典。我將在此基礎之上,首先列出一個簡單的客戶端/服務器例子,開啟Java I/O謙卑之旅。此例展示來自java.io包和NIO——Java1.4引起的新的非阻塞I/O API的特性,最后一個例子會涉及Java 7引入的 NIO2 某些特性。
Java的Socket編程:TCP和UDP
Socket編程拆分為兩個系統之間的相互通信,網絡通信有兩種方式:ransport Control Protocol(TCP)和User Datagram Protocol(UDP)。TCP和UDP用途不一,并且有各自獨特的約束:
- TCP協議相對簡單穩定,可以幫助客戶端與一臺服務器建立連接,這樣兩個系統就可以通信。在TCP協議中,每個實體都能保證其通信載荷(communication payload)會被接受。
- UDP是一種非連接協議,適用于那些無需保證每個包都能抵達終點的場景,比如流媒體。
如何區分這兩者的差異?試想,倘若你在自己喜歡的網站上觀看流媒體視頻,這時掉幀會發生什么。你是傾向于客戶端放緩視頻接收丟失的幀,還是繼續觀看視頻呢?典型的流媒體協議采用UDP協議,因為TCP協議保障傳輸,HTTP、FTP、SMTP、POP3等協議會選擇TCP。
以往的Socket編程
早在NIO以前,Java TCP客戶端socket代碼主要由java.net.Socket類來實現。下面的代碼開啟了一個對服務器的連接:
Socket socket = new Socket( server, port );
一旦Socket實例與服務器相連,我們就可以獲得服務器端的輸入輸出流。輸入流用來讀取服務器端的數據,輸出流用來將數據寫回到服務器端。可以執行以下的方法獲取輸入輸出流:
InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream();
這是基本的流——用來讀取或者寫入一個文件的流是相同的,所以我們能夠將其轉換成最好的形式服務于用例中。比如,我們可以用一個PrintStream 包裝 OutputStream,這樣我們就能輕易地用println()等方法對文本進行寫的操作。再比如,我們用BufferedReader包裝 InputStream,再通過InputStreamReader可以很容易的用readLine()等方法對文本進行讀操作。
Java I/O示例第一部分:HTTP客戶端
通過一個簡短的例子來看如何執行HTTP GET獲取一個HTTP服務。HTTP比本例更加復雜成熟,在我們只寫一個客戶端代碼去處理簡單案例。發出一個請求,從服務器端獲取一個資源,同時服務器端返回響應,并關閉流。本案例所需的步驟如下:
- 創建端口為80的網絡服務器所對應的客戶端Socket。
- 從服務器端獲取一個PrintStream,同時發送一個GET PATH HTTP/1.0請求,其中PATH就是服務器上的請求資源。比如,假設你想打開一個網站根目錄,那么path就是 / 。
- 獲取服務器端的InputStream,用一個BufferedReader將其包裝,然后按行讀取響應。
列表1、 SimpleSocketClientExample.java
package com.geekcap.javaworld.simplesocketclient; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; public class SimpleSocketClientExample { public static void main( String[] args ) { if( args.length < 2 ) { System.out.println( "Usage: SimpleSocketClientExample <server> <path>" ); System.exit( 0 ); } String server = args[ 0 ]; String path = args[ 1 ]; System.out.println( "Loading contents of URL: " + server ); try { // 創建與端口為80的網絡服務器對應的客戶端socket Socket socket = new Socket( server, 80 ); //從服務器端獲取一個PrintStream PrintStream out = new PrintStream( socket.getOutputStream() ); //獲取服務器端的InputStream,用一個BufferedReader將其包裝 BufferedReader in = new BufferedReader( new InputStreamReader( socket.getInputStream() ) ); //發送一個GET PATH HTTP/1.0請求到服務器端 out.println( "GET " + path + " HTTP/1.0" ); out.println(); //按行的讀取服務器端的返回的響應數據 String line = in.readLine(); while( line != null ) { System.out.println( line ); line = in.readLine(); } // 關閉流 in.close(); out.close(); socket.close(); } catch( Exception e ) { e.printStackTrace(); } } }
列表1接受兩個命令行參數:需要連接的服務器,需要取回的資源。創建一個Socket指向服務器端,并且顯式地為其指定端口號80,接著程序會指向這個命令:
GET PATH HTTP/1.0
比如
GET / HTTP/1.0
這個過程中發生了什么?
當你準備從一個web服務器獲取一個網頁,比如 www.google.com, HTTP client利用DNS服務器去獲取服務器地址:從最高域名服務器開始查詢com域名,哪里存有 www.google.com 的權威域名服務器,接著 HTTP client詢問域名服務器 www.google.com 的IP地址。接下來,它會打開一個Socket通向端口80的服務器。最后, HTTP Client執行特定的HTTP方法,比如GET、POST、PUT、DELETE、HEAD 或者OPTI/ONS。每種方法都有自己的語法,如上述的代碼列表中,GET方法后面依次需要一個path、HTTP/版本號、一個空行。如果想加入 HTTP headers,我們必須在進入新的一行之前完成。
在列表1中,獲取了一個 OutputStream,并用 PrintStream 包裝了它,這樣我們就能容易的執行基于文本的命令。 同樣,從 InputStream 獲取的代碼,InputStreamReader 包裝之后,流被轉化成一個Reader,再用 BufferedReader 包裝。這樣我們就能用PrintStream執行GET方法,用BufferedReader 按行讀取響應直到獲取的響應為 null 時結束,最后關閉Socket。
現在我們執行這個類,傳入以下的參數:
java com.geekcap.javaworld.simplesocketclient.SimpleSocketClientExample www.javaworld.com /
你應該能夠看到類似下面的輸出:
Loading contents of URL: www.javaworld.com HTTP/1.1 200 OK Date: Sun, 21 Sep 2014 22:20:13 GMT Server: Apache X-Gas_TTL: 10 Cache-Control: max-age=10 X-GasHost: gas2.usw X-Cooking-With: Gasoline-Local X-Gasoline-Age: 8 Content-Length: 168 Last-Modified: Tue, 24 Jan 2012 00:09:09 GMT Etag: "60001b-a8-4b73af4bf3340" Content-Type: text/html Vary: Accept-Encoding Connection: close <!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8" /> <title>Gasoline Test Page</title> </head> <body> <br><br> <center>Success</center> </body> </html>
本輸出顯示了JavaWorld網站測試頁面,網頁HTTP version 1.1,響應200 OK.
Java I/O示例第二部分:HTTP服務器
剛才我們說了客戶端,幸運的是,服務器端的通信也是很容易。從一個簡單的視角看,處理過程如下:
- 創建一個ServerSocket,并指定一個監聽端口。
- 調用 ServerSocket的 accept() 方法監聽來自客戶端的連接。
- 一旦有客戶端連接服務器,accept() 方法通過服務器與客戶端通信,返回一個Socket。在客戶端用過同樣的Socket類,那么處理過程相同,獲取 InputStream 讀取客戶端信息,OutputStream 寫數據到客戶端。
- 如果服務器需要擴展,你需要將Socket傳給其他的線程去處理,因此服務器可以持續的監聽后來的連接。
- 再次調用 ServerSocket的 accept() 方法監聽其它連接。
正如你所看到的,NIO處理此場景略有不同。可以直接創建ServerSocket,并將一個端口號傳給它用于監聽(關于 ServerSocketFactory 的更多信息會在后面討論):
ServerSocket serverSocket = new ServerSocket( port );
通過 accept() 方法接收傳入的連接:
Socket socket = serverSocket.accept(); // 處理連接……
多線程Socket編程
在如下的列表2中,所有的服務器代碼放在一起組成一個更加健壯的例子,本例中線程處理多個請求。服務器是一個ECHO服務器,就是說會將所有接收到的消息返回。
列表2中的例子不是很復雜,但已經提前介紹了一部分NIO的內容。在線程代碼上花費一些精力,是為了構建一個處理多并發請求的服務器。
列表2、SimpleSocketServer.java
package com.geekcap.javaworld.simplesocketclient; import java.io.BufferedReader; import java.io.I/OException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class SimpleSocketServer extends Thread { private ServerSocket serverSocket; private int port; private boolean running = false; public SimpleSocketServer( int port ) { this.port = port; } public void startServer() { try { serverSocket = new ServerSocket( port ); this.start(); } catch (I/OException e) { e.printStackTrace(); } } public void stopServer() { running = false; this.interrupt(); } @Override public void run() { running = true; while( running ) { try { System.out.println( "Listening for a connection" ); // 調用 accept() 處理下一個連接 Socket socket = serverSocket.accept(); // 向 RequestHandler 線程傳遞socket對象進行處理 RequestHandler requestHandler = new RequestHandler( socket ); requestHandler.start(); } catch (I/OException e) { e.printStackTrace(); } } } public static void main( String[] args ) { if( args.length == 0 ) { System.out.println( "Usage: SimpleSocketServer <port>" ); System.exit( 0 ); } int port = Integer.parseInt( args[ 0 ] ); System.out.println( "Start server on port: " + port ); SimpleSocketServer server = new SimpleSocketServer( port ); server.startServer(); // 1分鐘后自動關閉 try { Thread.sleep( 60000 ); } catch( Exception e ) { e.printStackTrace(); } server.stopServer(); } } class RequestHandler extends Thread { private Socket socket; RequestHandler( Socket socket ) { this.socket = socket; } @Override public void run() { try { System.out.println( "Received a connection" ); // 獲取輸入和輸出流 BufferedReader in = new BufferedReader( new InputStreamReader( socket.getInputStream() ) ); PrintWriter out = new PrintWriter( socket.getOutputStream() ); // 向客戶端寫出頭信息 out.println( "Echo Server 1.0" ); out.flush(); // 向客戶端回寫信息,直到客戶端關閉連接或者收到空行 String line = in.readLine(); while( line != null && line.length() > 0 ) { out.println( "Echo: " + line ); out.flush(); line = in.readLine(); } // 關閉自己的連接 in.close(); out.close(); socket.close(); System.out.println( "Connection closed" ); } catch( Exception e ) { e.printStackTrace(); } } }
在列表2中,我們創建了一個新的 SimpleSocketServer 實例,并開啟了這個服務器。繼承 Thread 的 SimpleSocketServer 創建一個新的線程,處理存在于 run() 方法中的阻塞方法 accept() 調用。
run() 方法中存在一個循環,用來接收客戶端請求,并創建RequestHandler線程去處理這些請求。再次強調,這是一個相對簡單的編程,但涉及了相當的線程編程。
RequestHandler 處理客戶端通信代碼與列表1相似:PrintStream 包裝后的 OutputStream 更容易進行寫操作。同 樣,BufferedReader 包裝后的InputStream 更易于讀取。只要服務器在跑,RequestHandler 就會將客戶端的信息按行讀取,并將它們返回給客戶端。如果客戶端發過來的是空行,那對話就結束了,RequestHandler 關閉Socket 。
NIO、NIO2 Socket編程
對于多數應用而言,Java基礎的Socket編程,我們已經做了充分的探討。對于涉及到高強度的 I/O 或者異步輸入輸出,大家就有了熟悉Java NIO和NIO.2中非阻塞API的需要。
JDK1.4 NIO包提供了如下重要特性:
- Channel 被設計用來支持塊(bulk)轉移,從一個NIO轉到另一個NIO。
- Buffer 提供了連續的內存塊,由一組簡單的操作提供接口。
- 非阻塞I/O 是一組class文件,它們可以將 Channel 開放給普通的I/O資源,比如文件和Socket。
用NIO編碼時,你可以打開一個到目的地的Channel,接著從目的地讀取數據到一個buffer中;寫入數據到一個buffer中,接著將其發送到目的地。我會創建一個Socket,并為此獲取一個Channel。但首先讓我們回顧一下buffer的處理流程:
- 寫數據到一個buffer中。
- 調用buffer的 flip() 方法準備讀的操作。
- 從buffer中讀取數據。
- 調用buffer中的 clear() 或者 compact() 方法準備讀取更多的數據。
當數據寫入buffer后,buffer知道寫入其中的數據量。它維護了三個屬性,在讀模式和寫模式中其含義不盡相同。
- Position:在寫模式中,初始position值為0,它存儲的是寫入buffer后的當前位置;一旦flip一個buffer使其進入讀模式,它會將位置的值重置為0,然后存儲讀取buffer后的當前位置。
- Capacity:指的是buffer的固定大小。
- Limit:在寫模式中,limit定義了寫入buffer的數據大小;在讀模式中,limit定義了可以從buffer中讀取的數據大小。
Java I/O示例第三部分:基于NIO.2的ECHO服務器
JDK 7引入的NIO.2添加了非阻塞I/O庫去支持文件系統任務,比如 java.nio.file 包和 java.nio.file.Path 類,并提供了一個 新的文件系統API。記住,我們采用IO.2 AsynchronousServerSocketChannel 寫一個新的ECHO服務器。
”NIO在提供處理性能方法大放異彩,但NIO的結果跟底層平臺緊密相連。比如,或許你會發現,NIO加速應用性能不光取決于OS,還跟特定的JVM有關,主機的虛擬化上下文、大存儲特性、甚至數據……”
——摘自”Five ways to maximize Java NIO and NIO.2“
AsynchronousServerSocketChannel 提供了一個非阻塞異步Channel作為流定向監聽的Socket。為了用這個Channel,首先需要執行它的 open() 靜態方法。然后調用 bind() 為其綁定一個端口號。接著,將一個實現CompletionHandler接口的類傳給 accept() 并執行。多數時候,你會發現 handler作為匿名內部類被創建。
列表3顯示新的異步ECHO服務器源碼。
列表3、SimpleSocketServer.java
package com.geekcap.javaworld.nio2; import java.io.I/OException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class NioSocketServer { public NioSocketServer() { try { // 創建一個 AsynchronousServerSocketChannel 偵聽 5000 端口 final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000)); // 偵聽新的請求 listener.accept( null, new CompletionHandler<AsynchronousSocketChannel,Void>() { @Override public void completed(AsynchronousSocketChannel ch, Void att) { // 接受下一個連接 listener.accept( null, this ); // 向客戶端發送問候信息 ch.write( ByteBuffer.wrap( "Hello, I am Echo Server 2020, let's have an engaging conversation!n".getBytes() ) ); // 分配(4K)字節緩沖用于從客戶端讀取信息 ByteBuffer byteBuffer = ByteBuffer.allocate( 4096 ); try { // Read the first line int bytesRead = ch.read( byteBuffer ).get( 20, TimeUnit.SECONDS ); boolean running = true; while( bytesRead != -1 && running ) { System.out.println( "bytes read: " + bytesRead ); // 確保有讀取到數據 if( byteBuffer.position() > 2 ) { // 準備緩存進行讀取 byteBuffer.flip(); // 把緩存轉換成字符串 byte[] lineBytes = new byte[ bytesRead ]; byteBuffer.get( lineBytes, 0, bytesRead ); String line = new String( lineBytes ); // Debug System.out.println( "Message: " + line ); // 向調用者回寫 ch.write( ByteBuffer.wrap( line.getBytes() ) ); // 準備緩沖進行寫操作 byteBuffer.clear(); // 讀取下一行 bytesRead = ch.read( byteBuffer ).get( 20, TimeUnit.SECONDS ); } else { // 在我們的協議中,空行表示會話結束 running = false; } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { // 用戶達到20秒超時,關閉連接 ch.write( ByteBuffer.wrap( "Good Byen".getBytes() ) ); System.out.println( "Connection timed out, closing connection" ); } System.out.println( "End of conversation" ); try { // 如果需要,關閉連接 if( ch.isOpen() ) { ch.close(); } } catch (I/OException e1) { e1.printStackTrace(); } } @Override public void failed(Throwable exc, Void att) { ///... } }); } catch (I/OException e) { e.printStackTrace(); } } public static void main( String[] args ) { NioSocketServer server = new NioSocketServer(); try { Thread.sleep( 60000 ); } catch( Exception e ) { e.printStackTrace(); } } }
在列表3中,我們首先創建了一個新的AsynchronousServerSocketChannel,然后為其綁定端口號5000:
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));
調用 AsynchronousServerSocketChannel 的 accept(),通知其監聽一個連接,并將一個典型的CompletionHandler傳給它。一旦調用 accept(),結果會立即返回。注意,本例不同于列表2中的ServerSocket類;除非一個客戶端與ServerSocket相連,否則accept()會被阻塞。AsynchronousChannelGroup 的 accept() 會為我們解決這個問題。
完整的Handler處理
接 下來的主要任務就是創建一個 CompletionHandler 類,并實現 completed() 和 failed() 方法。當 AsynchronousServerSocketChannel 接收一個客戶端連接,這個連接包含一個連接客戶端的 AsynchronousSocketChannel,completed()方法就會被調用。completed()方法第一次被調用從AsynchronousServerSocketChannel 處接收連接,開始與客戶端進行通信。首先它做的事情向客戶端寫入一個“hello”消息:建立一個字符串,并將其轉換成字節數組并將其傳給 ByteBuffer.wrap(),完了構造一個ByteBuffer。接著ByteBuffer傳給 AsynchronousSocketChannel的 write() 方法。
為了更夠從客戶端那里讀取數據,我們創建了一個新的ByteBuffer,并調用它的allocate(4096)。接 著我們調用了AsynchronousSocketChannel的 read() 方法,此方法會返回一個 Future<Integer>,調用后者的 get() 方法可以獲取讀自客戶端的字節數。在本例中,我們傳遞了20秒的timeout參數給 get();如果20分鐘沒有得到響應,那 get() 就會拋出一個TimeoutException。本回響服務器的應對策略是,如果20秒沒有響應,就終止這個對話。
異步計算中的Future
“The Future<V>接口顯示一個異步計算的結果,此結果作為一個Future,因為它直到未來的某個時刻才存在。你可以調用它的方法去取消一個任務,返回任務的結果——如果任務沒有完成,無限等待或者超時退出——并且決定任務是否已取消或者完成……”。
——摘自”Java concurrency without the pain, Part 1“
接下來我們會檢測buffer的position,它會定位到最后一個來自客戶端的byte。倘若客戶端發來的是一個空行,接收兩個字節:一個回車和一個換行。檢測確保客戶端發出一個空白行,我們以此作為客戶端對話結束的信號。如果我們擁有有意義的數據,那我們就調用ByteBuffer的 flip() 方法去進入讀的狀態。我們可以創建一個臨時byte數組去存儲讀自客戶端的數據,然后調用ByteBuffer的 get() 加載數據到byte數組中。最后,我們通過創建一個新的String對象將數組轉換成一行字符串。我們將這行字符串返回給客戶端:將字符串line轉換成一個byte數組,作為參數傳遞給 ByteBuffer.wrap(),然后調用 AsynchronousSocketChannel的write() 方法。接著調用ByteBuffer的clear(),這樣position被重置為0并將ByteBuffer置于寫的模式,接著我們讀取客戶端下一行。
需要注意的是 main() 方法。它 創建了服務器,同時創建了一個讓應用跑60秒的計時器。這是因為AsynchronousSocketChannel的 accept() 會理解返回,如果線程 Thread.sleep() 不執行,應用將會立即停止。為了進行測試,啟動服務器后用telnet客戶端進行連接:
telnet localhost 5000
發送少量的字符串給服務器,觀察它們向你返回結果,然后發送一個空行結束對話。
結語
本文展示了兩種Socket Java編程方式:傳統的Java 1.0引入的編寫方式,Java 1.4和Java 7中分別引入的非阻塞 NIO 和 NIO.2 方式。采用客戶端服務器幾次迭代的例子,展示了基本 Java I/O的使用,以及一些場景下非阻塞I/O對Java socket編程模型的改進和簡化。利用非阻塞I/O,你可以編寫網絡應用來處理多并發連接,而無需管理多線程集合。同樣,你也可以利用構建在NIO和 NIO.2上新的服務器擴展特性。
原文鏈接: javaworld 翻譯: ImportNew.com - 喬永琪
譯文鏈接: http://www.importnew.com/15996.html