Java核心知識點-NIO

s.w.pollux 8年前發布 | 24K 次閱讀 Java NIO Java開發

文件讀取中的NIO

在Java1.4之前的I/O系統中,提供的都是面向流的I/O系統,系統一次一個字節地處理數據,一個輸入流產生一個字節的數據,一個輸出流消費一個字節的數據,面向流的I/O速度非常慢,而在Java 1.4中推出了NIO,這是一個面向塊的I/O系統,系統以塊的方式處理處理,每一個操作在一步中產生或者消費一個數據庫,按塊處理要比按字節處理數據快的多。

在NIO中有幾個核心對象需要掌握:緩沖區(Buffer)、通道(Channel)、選擇器(Selector)。

緩沖區Buffer

緩沖區實際上是一個容器對象,更直接的說,其實就是一個數組,在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的; 在寫入數據時,它也是寫入到緩沖區中的;任何時候訪問 NIO 中的數據,都是將它放到緩沖區中。而在面向流I/O系統中,所有數據都是直接寫入或者直接將數據讀取到Stream對象中。

在NIO中,所有的緩沖區類型都繼承于抽象類Buffer,最常用的就是ByteBuffer,對于Java中的基本類型,基本都有一個具體Buffer類型與之相對應,它們之間的繼承關系如下圖所示:

 

下面是一個簡單的使用IntBuffer的例子:

import java.nio.IntBuffer;

public class TestIntBuffer { public static void main(String[] args) { // 分配新的int緩沖區,參數為緩沖區容量 // 新緩沖區的當前位置將為零,其界限(限制位置)將為其容量。它將具有一個底層實現數組,其數組偏移量將為零。 IntBuffer buffer = IntBuffer.allocate(8);

    for (int i = 0; i < buffer.capacity(); ++i) {
        int j = 2 * (i + 1);
        // 將給定整數寫入此緩沖區的當前位置,當前位置遞增
        buffer.put(j);
    }

    // 重設此緩沖區,將限制設置為當前位置,然后將當前位置設置為0
    buffer.flip();

    // 查看在當前位置和限制位置之間是否有元素
    while (buffer.hasRemaining()) {
        // 讀取此緩沖區當前位置的整數,然后當前位置遞增
        int j = buffer.get();
        System.out.print(j + "  ");
    }

}

}</code></pre>

運行后可以看到:

通道Channel

通道是一個對象,通過它可以讀取和寫入數據,當然了所有數據都通過Buffer對象來處理。我們永遠不會將字節直接寫入通道中,相反是將數據寫入包含一個或者多個字節的緩沖區。同樣不會直接從通道中讀取字節,而是將數據從通道讀入緩沖區,再從緩沖區獲取這個字節。

在NIO中,提供了多種通道對象,而所有的通道對象都實現了Channel接口。它們之間的繼承關系如下圖所示:

使用NIO讀取數據

在前面我們說過,任何時候讀取數據,都不是直接從通道讀取,而是從通道讀取到緩沖區。所以使用NIO讀取數據可以分為下面三個步驟:
1. 從FileInputStream獲取Channel
2. 創建Buffer
3. 將數據從Channel讀取到Buffer中

下面是一個簡單的使用NIO從文件中讀取數據的例子:

import java.io.;
import java.nio.;
import java.nio.channels.*;

public class Program { static public void main( String args[] ) throws Exception { FileInputStream fin = new FileInputStream("c:\test.txt");

    // 獲取通道
    FileChannel fc = fin.getChannel();

    // 創建緩沖區
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    // 讀取數據到緩沖區
    fc.read(buffer);

    buffer.flip();

    while (buffer.remaining()>0) {
        byte b = buffer.get();
        System.out.print(((char)b));
    }

    fin.close();
}

}</code></pre>

使用NIO寫入數據

使用NIO寫入數據與讀取數據的過程類似,同樣數據不是直接寫入通道,而是寫入緩沖區,可以分為下面三個步驟:
1. 從FileInputStream獲取Channel
2. 創建Buffer
3. 將數據從Channel寫入到Buffer中

下面是一個簡單的使用NIO向文件中寫入數據的例子:

import java.io.*;  
import java.nio.*;  
import java.nio.channels.*;  
  
public class Program {  
    static private final byte message[] = { 83, 111, 109, 101, 32,  
        98, 121, 116, 101, 115, 46 };  
  
    static public void main( String args[] ) throws Exception {  
        FileOutputStream fout = new FileOutputStream( "c:\\test.txt" );  
          
        FileChannel fc = fout.getChannel();  
          
        ByteBuffer buffer = ByteBuffer.allocate( 1024 );  
          
        for (int i=0; i<message.length; ++i) {  
            buffer.put( message[i] );  
        }  
          
        buffer.flip();  
          
        fc.write( buffer );  
          
        fout.close();  
    }  
}  

在第一篇中,我們介紹了NIO中的兩個核心對象:緩沖區和通道,在談到緩沖區時,我們說緩沖區對象本質上是一個數組,但它其實是一個特殊的數組,緩沖區對象內置了一些機制,能夠跟蹤和記錄緩沖區的狀態變化情況,如果我們使用get()方法從緩沖區獲取數據或者使用put()方法把數據寫入緩沖區,都會引起緩沖區狀態的變化。本文為NIO使用及原理分析的第二篇,將會分析NIO中的Buffer對象。

在緩沖區中,最重要的屬性有下面三個,它們一起合作完成對緩沖區內部狀態的變化跟蹤:

position:指定了下一個將要被寫入或者讀取的元素索引,它的值由get()/put()方法自動更新,在新創建一個Buffer對象時,position被初始化為0。

limit:指定還有多少數據需要取出(在從緩沖區寫入通道時),或者還有多少空間可以放入數據(在從通道讀入緩沖區時)。

capacity:指定了可以存儲在緩沖區中的最大數據容量,實際上,它指定了底層數組的大小,或者至少是指定了準許我們使用的底層數組的容量。

以上四個屬性值之間有一些相對大小的關系:0 <= position <= limit <= capacity。如果我們創建一個新的容量大小為10的ByteBuffer對象,在初始化的時候,position設置為0,limit和 capacity被設置為10,在以后使用ByteBuffer對象過程中,capacity的值不會再發生變化,而其它兩個個將會隨著使用而變化。四個屬性值分別如圖所示:

 

現在我們可以從通道中讀取一些數據到緩沖區中,注意從通道讀取數據,相當于往緩沖區中寫入數據。如果讀取4個自己的數據,則此時position的值為4,即下一個將要被寫入的字節索引為4,而limit仍然是10,如下圖所示:

 

下一步把讀取的數據寫入到輸出通道中,相當于從緩沖區中讀取數據,在此之前,必須調用flip()方法,該方法將會完成兩件事情:

1. 把limit設置為當前的position值
2. 把position設置為0

由于position被設置為0,所以可以保證在下一步輸出時讀取到的是緩沖區中的第一個字節,而limit被設置為當前的position,可以保證讀取的數據正好是之前寫入到緩沖區中的數據,如下圖所示:

 

現在調用get()方法從緩沖區中讀取數據寫入到輸出通道,這會導致position的增加而limit保持不變,但position不會超過limit的值,所以在讀取我們之前寫入到緩沖區中的4個自己之后,position和limit的值都為4,如下圖所示:

 

在從緩沖區中讀取數據完畢后,limit的值仍然保持在我們調用flip()方法時的值,調用clear()方法能夠把所有的狀態變化設置為初始化時的值,如下圖所示:

 

最后我們用一段代碼來驗證這個過程,如下所示:

import java.io.;
import java.nio.;
import java.nio.channels.*;

public class Program { public static void main(String args[]) throws Exception { FileInputStream fin = new FileInputStream("d:\test.txt"); FileChannel fc = fin.getChannel();

    ByteBuffer buffer = ByteBuffer.allocate(10);
    output("初始化", buffer);

    fc.read(buffer);
    output("調用read()", buffer);

    buffer.flip();
    output("調用flip()", buffer);

    while (buffer.remaining() > 0) {
        byte b = buffer.get();
        // System.out.print(((char)b));
    }
    output("調用get()", buffer);

    buffer.clear();
    output("調用clear()", buffer);

    fin.close();
}

public static void output(String step, Buffer buffer) {
    System.out.println(step + " : ");
    System.out.print("capacity: " + buffer.capacity() + ", ");
    System.out.print("position: " + buffer.position() + ", ");
    System.out.println("limit: " + buffer.limit());
    System.out.println();
}

}</code></pre>

完成的輸出結果為:

在上一篇文章中介紹了緩沖區內部對于狀態變化的跟蹤機制,而對于NIO中緩沖區來說,還有很多的內容值的學習,如緩沖區的分片與數據共享,只讀緩沖區等。在本文中我們來看一下緩沖區一些更細節的內容。

緩沖區的分配

在前面的幾個例子中,我們已經看過了,在創建一個緩沖區對象時,會調用靜態方法allocate()來指定緩沖區的容量,其實調用 allocate()相當于創建了一個指定大小的數組,并把它包裝為緩沖區對象。或者我們也可以直接將一個現有的數組,包裝為緩沖區對象,如下示例代碼所示:

public class BufferWrap {

public void myMethod()
{
    // 分配指定大小的緩沖區
    ByteBuffer buffer1 = ByteBuffer.allocate(10);

    // 包裝一個現有的數組
    byte array[] = new byte[10];
    ByteBuffer buffer2 = ByteBuffer.wrap( array );
}

}</code></pre>

緩沖區分片

在NIO中,除了可以分配或者包裝一個緩沖區對象外,還可以根據現有的緩沖區對象來創建一個子緩沖區,即在現有緩沖區上切出一片來作為一個新的緩沖區,但現有的緩沖區與創建的子緩沖區在底層數組層面上是數據共享的,也就是說,子緩沖區相當于是現有緩沖區的一個視圖窗口。調用slice()方法可以創建一個子緩沖區,讓我們通過例子來看一下:

import java.nio.*;

public class Program { static public void main( String args[] ) throws Exception { ByteBuffer buffer = ByteBuffer.allocate( 10 );

    // 緩沖區中的數據0-9
    for (int i=0; i<buffer.capacity(); ++i) {
        buffer.put( (byte)i );
    }

    // 創建子緩沖區
    buffer.position( 3 );
    buffer.limit( 7 );
    ByteBuffer slice = buffer.slice();

    // 改變子緩沖區的內容
    for (int i=0; i<slice.capacity(); ++i) {
        byte b = slice.get( i );
        b *= 10;
        slice.put( i, b );
    }

    buffer.position( 0 );
    buffer.limit( buffer.capacity() );

    while (buffer.remaining()>0) {
        System.out.println( buffer.get() );
    }
}

}</code></pre>

在該示例中,分配了一個容量大小為10的緩沖區,并在其中放入了數據0-9,而在該緩沖區基礎之上又創建了一個子緩沖區,并改變子緩沖區中的內容,從最后輸出的結果來看,只有子緩沖區“可見的”那部分數據發生了變化,并且說明子緩沖區與原緩沖區是數據共享的,輸出結果如下所示:

 

只讀緩沖區

只讀緩沖區非常簡單,可以讀取它們,但是不能向它們寫入數據。可以通過調用緩沖區的asReadOnlyBuffer()方法,將任何常規緩沖區轉 換為只讀緩沖區,這個方法返回一個與原緩沖區完全相同的緩沖區,并與原緩沖區共享數據,只不過它是只讀的。如果原緩沖區的內容發生了變化,只讀緩沖區的內容也隨之發生變化:

import java.nio.*;

public class Program { static public void main( String args[] ) throws Exception { ByteBuffer buffer = ByteBuffer.allocate( 10 );

    // 緩沖區中的數據0-9
    for (int i=0; i<buffer.capacity(); ++i) {
        buffer.put( (byte)i );
    }

    // 創建只讀緩沖區
    ByteBuffer readonly = buffer.asReadOnlyBuffer();

    // 改變原緩沖區的內容
    for (int i=0; i<buffer.capacity(); ++i) {
        byte b = buffer.get( i );
        b *= 10;
        buffer.put( i, b );
    }

    readonly.position(0);
    readonly.limit(buffer.capacity());

    // 只讀緩沖區的內容也隨之改變
    while (readonly.remaining()>0) {
        System.out.println( readonly.get());
    }
}

}</code></pre>

 

如果嘗試修改只讀緩沖區的內容,則會報ReadOnlyBufferException異常。只讀緩沖區對于保護數據很有用。在將緩沖區傳遞給某個 對象的方法時,無法知道這個方法是否會修改緩沖區中的數據。創建一個只讀的緩沖區可以保證該緩沖區不會被修改。只可以把常規緩沖區轉換為只讀緩沖區,而不能將只讀的緩沖區轉換為可寫的緩沖區。

直接緩沖區

直接緩沖區是為加快I/O速度,使用一種特殊方式為其分配內存的緩沖區,JDK文檔中的描述為:給定一個直接字節緩沖區,Java虛擬機將盡最大努 力直接對它執行本機I/O操作。也就是說,它會在每一次調用底層操作系統的本機I/O操作之前(或之后),嘗試避免將緩沖區的內容拷貝到一個中間緩沖區中 或者從一個中間緩沖區中拷貝數據。要分配直接緩沖區,需要調用allocateDirect()方法,而不是allocate()方法,使用方式與普通緩沖區并無區別,如下面的拷貝文件示例:

import java.io.;
import java.nio.;
import java.nio.channels.*;

public class Program { static public void main( String args[] ) throws Exception { String infile = "c:\test.txt"; FileInputStream fin = new FileInputStream( infile ); FileChannel fcin = fin.getChannel();

    String outfile = String.format("c:\\testcopy.txt");
    FileOutputStream fout = new FileOutputStream( outfile );    
    FileChannel fcout = fout.getChannel();

    // 使用allocateDirect,而不是allocate
    ByteBuffer buffer = ByteBuffer.allocateDirect( 1024 );

    while (true) {
        buffer.clear();

        int r = fcin.read( buffer );

        if (r==-1) {
            break;
        }

        buffer.flip();

        fcout.write( buffer );
    }
}

}</code></pre>

內存映射文件I/O

內存映射文件I/O是一種讀和寫文件數據的方法,它可以比常規的基于流或者基于通道的I/O快的多。內存映射文件I/O是通過使文件中的數據出現為 內存數組的內容來完成的,這其初聽起來似乎不過就是將整個文件讀到內存中,但是事實上并不是這樣。一般來說,只有文件中實際讀取或者寫入的部分才會映射到內存中。如下面的示例代碼:

import java.io.;
import java.nio.;
import java.nio.channels.*;

public class Program { static private final int start = 0;<span style="font-family:FangSong_GB2312;font-size:13px;"> static private final int size = 1024;

static public void main( String args[] ) throws Exception {
    RandomAccessFile raf = new RandomAccessFile( "c:\\test.txt", "rw" );
    FileChannel fc = raf.getChannel();

    MappedByteBuffer mbb = fc.map( FileChannel.MapMode.READ_WRITE,
      start, size );

    mbb.put( 0, (byte)97 );
    mbb.put( 1023, (byte)122 );

    raf.close();
}

}</code></pre>

 

關于緩沖區的細節內容,我們已經用了兩篇文章來介紹。在下一篇中將會介紹NIO中更有趣的部分Nonblocking I/O。

 

網絡傳輸中的NIO

Java NIO是在jdk1.4開始使用的,它既可以說成“新IO”,也可以說成非阻塞式I/O。下面是java NIO的工作原理:

  • 由一個專門的線程來處理所有的IO事件,并負責分發。

  • 事件驅動機制:事件到的時候觸發,而不是同步的去監視事件。

  • 線程通訊:線程之間通過wait,notify等方式通訊。保證每次上下文切換都是有意義的。減少無謂的線程切換。(//核心所在)

閱讀過一些資料之后,下面貼出我理解的java  NIO的工作原理圖:

注:每個線程的處理流程大概都是讀取數據,解碼,計算處理,編碼,發送響應。

Java NIO的服務端只需啟動一個專門的線程來處理所有的IO事件,這種通信模型是怎么實現的呢?呵呵,我們一起來探究它的奧秘吧。java NIO采用了雙向通道(channel)進行數據傳輸,而不是單向流(stream),在通道上可以注冊我們感興趣的事件。一共有以下四種事件:

事件名                                                 對應值                                                         
服務端接收客戶端連接事件 SelectionKey.OP_ACCEPT(16)
客戶端連接服務端事件 SelectionKey.OP_CONNECT(8)
讀事件 SelectionKey.OP_READ(1)
寫事件 SelectionKey.OP_WRITE(4)

服務端和客戶端各自維護一個管理通道的對象,我們稱之為selector,該對象能檢測一個或多個通道(channel)上的事件。我們以服務端為例,如果服務端的selector上注冊了讀事件,某時刻客戶端給服務端送了一些數據,阻塞I/O這時會調用read()方法阻塞地讀取數據,而NIO的服務端會在selector中添加一個讀事件。服務端的處理線程會輪詢地訪問selector,如果訪問selector時發現有感興趣的事件到達,則處理這些事件,如果沒有感興趣的事件到達,則處理線程會一直阻塞直到感興趣的事件到達為止。下面是我理解的java NIO的通信模型示意圖:

為了更好地理解java NIO,下面貼出服務端和客戶端的簡單代碼實現:

服務端

package cn.nio;  
  
import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  
  
/ 
  NIO服務端 
  @author 小路 
 */  
public class NIOServer {  
    //通道管理器  
    private Selector selector;  
  
    / 
      獲得一個ServerSocket通道,并對該通道做一些初始化的工作 
      @param port  綁定的端口號 
      @throws IOException 
     /  
    public void initServer(int port) throws IOException {  
        // 獲得一個ServerSocket通道  
        ServerSocketChannel serverChannel = ServerSocketChannel.open();  
        // 設置通道為非阻塞  
        serverChannel.configureBlocking(false);  
        // 將該通道對應的ServerSocket綁定到port端口  
        serverChannel.socket().bind(new InetSocketAddress(port));  
        // 獲得一個通道管理器  
        this.selector = Selector.open();  
        //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后,  
        //當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。  
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);  
    }  
  
    /* 
      采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理 
      @throws IOException 
     /  
    
//這里其實相當于一個單線程循環詢問是否有消息到達,如果有就各種處理。

//這里selector.select();目的就是減少上下文切換,內部類似wait()等待,等待的線程不會被進程調度程序調到cpu上運行,如果有消息到了,就notify()通知一下。

//簡單比喻,你點了外賣就等著wait(),如果你的外賣到就會通知notify()你去拿外賣。

    @SuppressWarnings("unchecked")       public void listen() throws IOException {           System.out.println("服務端啟動成功!");           // 輪詢訪問selector           while (true) {               //當注冊的事件到達時,方法返回;否則,該方法會一直阻塞               selector.select();               // 獲得selector中選中的項的迭代器,選中的項為注冊的事件               Iterator ite = this.selector.selectedKeys().iterator();               while (ite.hasNext()) {                   SelectionKey key = (SelectionKey) ite.next();                   // 刪除已選的key,以防重復處理                   ite.remove();                   // 客戶端請求連接事件                   if (key.isAcceptable()) {                       ServerSocketChannel server = (ServerSocketChannel) key                               .channel();                       // 獲得和客戶端連接的通道                       SocketChannel channel = server.accept();                       // 設置成非阻塞                       channel.configureBlocking(false);                          //在這里可以給客戶端發送信息哦                       channel.write(ByteBuffer.wrap(new String("向客戶端發送了一條信息").getBytes()));                       //在和客戶端連接成功之后,為了可以接收到客戶端的信息,需要給通道設置讀的權限。                       channel.register(this.selector, SelectionKey.OP_READ);                                              // 獲得了可讀的事件                   } else if (key.isReadable()) {                           read(key);                   }                  }              }       }       /        處理讀取客戶端發來的信息 的事件        @param key        @throws IOException        /       public void read(SelectionKey key) throws IOException{           // 服務器可讀取消息:得到事件發生的Socket通道           SocketChannel channel = (SocketChannel) key.channel();           // 創建讀取的緩沖區           ByteBuffer buffer = ByteBuffer.allocate(10);           channel.read(buffer);           byte[] data = buffer.array();           String msg = new String(data).trim();           System.out.println("服務端收到信息:"+msg);           ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());           channel.write(outBuffer);// 將消息回送給客戶端       }              /        啟動服務端測試        @throws IOException        /       public static void main(String[] args) throws IOException {           NIOServer server = new NIOServer();           server.initServer(8000);           server.listen();       }      }  </code></pre>

客戶端

package cn.nio;  
  
import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.SocketChannel;  
import java.util.Iterator;  
  
/** 
  NIO客戶端 
  @author 小路 
 /  
public class NIOClient {  
    //通道管理器  
    private Selector selector;  
  
    / 
      獲得一個Socket通道,并對該通道做一些初始化的工作 
      @param ip 連接的服務器的ip 
      @param port  連接的服務器的端口號          
      @throws IOException 
     */  
    public void initClient(String ip,int port) throws IOException {  
        // 獲得一個Socket通道  
        SocketChannel channel = SocketChannel.open();  
        // 設置通道為非阻塞  
        channel.configureBlocking(false);  
        // 獲得一個通道管理器  
        this.selector = Selector.open();  
          
        // 客戶端連接服務器,其實方法執行并沒有實現連接,需要在listen()方法中調  
        //用channel.finishConnect();才能完成連接  
        channel.connect(new InetSocketAddress(ip,port));  
        //將通道管理器和該通道綁定,并為該通道注冊SelectionKey.OP_CONNECT事件。  
        channel.register(selector, SelectionKey.OP_CONNECT);  
    }  
  
    / 
      采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理 
      @throws IOException 
     /  
    @SuppressWarnings("unchecked")  
    public void listen() throws IOException {  
        // 輪詢訪問selector  
        while (true) {  
            selector.select();  
            // 獲得selector中選中的項的迭代器  
            Iterator ite = this.selector.selectedKeys().iterator();  
            while (ite.hasNext()) {  
                SelectionKey key = (SelectionKey) ite.next();  
                // 刪除已選的key,以防重復處理  
                ite.remove();  
                // 連接事件發生  
                if (key.isConnectable()) {  
                    SocketChannel channel = (SocketChannel) key  
                            .channel();  
                    // 如果正在連接,則完成連接  
                    if(channel.isConnectionPending()){  
                        channel.finishConnect();  
                          
                    }  
                    // 設置成非阻塞  
                    channel.configureBlocking(false);  
  
                    //在這里可以給服務端發送信息哦  
                    channel.write(ByteBuffer.wrap(new String("向服務端發送了一條信息").getBytes()));  
                    //在和服務端連接成功之后,為了可以接收到服務端的信息,需要給通道設置讀的權限。  
                    channel.register(this.selector, SelectionKey.OP_READ);  
                      
                    // 獲得了可讀的事件  
                } else if (key.isReadable()) {  
                        read(key);  
                }  
  
            }  
  
        }  
    }  
    /** 
      處理讀取服務端發來的信息 的事件 
      @param key 
      @throws IOException  
     /  
    public void read(SelectionKey key) throws IOException{  
        //和服務端的read方法一樣  
    }  
      
      
    /** 
      啟動客戶端測試 
      @throws IOException  
     /  
    public static void main(String[] args) throws IOException {  
        NIOClient client = new NIOClient();  
        client.initClient("localhost",8000);  
        client.listen();  
    }  
  
}</code></pre> 
  

 

 本文由用戶 s.w.pollux 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!