高級Java工程師必備 —– 深入分析 Java IO (二)NIO

接着上一篇文章 高級Java工程師必備 —– 深入分析 Java IO (一)BIO,我們來講講NIO

多路復用IO模型

場景描述

一個餐廳同時有100位客人到店,當然到店后第一件要做的事情就是點菜。但是問題來了,餐廳老闆為了節約人力成本目前只有一位大堂服務員拿着唯一的一本菜單等待客人進行服務。

方法A: 無論有多少客人等待點餐,服務員都把僅有的一份菜單遞給其中一位客人,然後站在客人身旁等待這個客人完成點菜過程。在記錄客人點菜內容后,把點菜記錄交給後堂廚師。然後是第二位客人。。。。然後是第三位客人。很明顯,只有腦袋被門夾過的老闆,才會這樣設置服務流程。因為隨後的80位客人,再等待超時后就會離店(還會給差評)。

方法B: 老闆馬上新雇傭99名服務員,同時印製99本新的菜單。每一名服務員手持一本菜單負責一位客人(關鍵不只在於服務員,還在於菜單。因為沒有菜單客人也無法點菜)。在客人點完菜后,記錄點菜內容交給後堂廚師(當然為了更高效,後堂廚師最好也有100名)。這樣每一位客人享受的就是VIP服務咯,當然客人不會走,但是人力成本可是一個大頭哦(虧死你)。

方法C: 就是改進點菜的方式,當客人到店后,自己申請一本菜單。想好自己要點的才后,就呼叫服務員。服務員站在自己身邊後記錄客人的菜單內容。將菜單遞給廚師的過程也要進行改進,並不是每一份菜單記錄好以後,都要交給後堂廚師。服務員可以記錄號多份菜單后,同時交給廚師就行了。那麼這種方式,對於老闆來說人力成本是最低的;對於客人來說,雖然不再享受VIP服務並且要進行一定的等待,但是這些都是可接受的;對於服務員來說,基本上她的時間都沒有浪費,基本上被老闆壓桿了最後一滴油水。

到店情況:併發量。到店情況不理想時,一個服務員一本菜單,當然是足夠了。所以不同的老闆在不同的場合下,將會靈活選擇服務員和菜單的配置。
客人:客戶端請求
點餐內容:客戶端發送的實際數據
服務員:操作系統內核用於IO操作的線程(內核線程)
廚師:應用程序線程(當然廚房就是應用程序進程咯)
餐單傳遞方式:包括了阻塞式和非阻塞式兩種。

  • 方法A:阻塞式/非阻塞式 同步IO
  • 方法B:使用線程進行處理的 阻塞式/非阻塞式 同步IO
  • 方法C:阻塞式/非阻塞式 多路復用IO

多路復用IO技術最適用的是“高併發”場景,所謂高併發是指1毫秒內至少同時有上千個連接請求準備好。其他情況下多路復用IO技術發揮不出來它的優勢。另一方面,使用JAVA NIO進行功能實現,相對於傳統的Socket套接字實現要複雜一些,所以實際應用中,需要根據自己的業務需求進行技術選擇。

NIO

概念

JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網絡I/O中都可能會發生,但本文只討論後者。

NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

面向流與面向緩衝

Java IO和NIO之間第一個最大的區別是,IO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取所有字節,它們沒有被緩存在任何地方。此外,它不能前後移動流中的數據。如果需要前後移動從流中讀取的數據,需要先將它緩存到一個緩衝區。

面向塊的 NIO一次處理一個數據塊,按塊處理數據比按流處理數據要快得多。數據讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的數據。而且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區里尚未處理的數據。

阻塞與非阻塞IO

Java IO的各種流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據完全寫入。該線程在此期間不能再干任何事情了。Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,但是它僅能得到目前可用的數據,如果目前沒有數據可用時,就什麼都不會獲取,而不是保持線程阻塞,所以直至數據變的可以讀取之前,該線程可以繼續做其他的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。 線程通常將非阻塞IO的空閑時間用於在其它通道上執行IO操作,所以一個單獨的線程現在可以管理多個輸入和輸出通道(channel)

通道

通道 Channel 是對原 I/O 包中的流的模擬,可以通過它讀取和寫入數據。

通道與流的不同之處在於,流只能在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),而通道是雙向的,可以用於讀、寫或者同時用於讀寫。

通道包括以下類型:

  • FileChannel:從文件中讀寫數據;
  • DatagramChannel:通過 UDP 讀寫網絡中數據;
  • SocketChannel:通過 TCP 讀寫網絡中數據;
  • ServerSocketChannel:可以監聽新進來的 TCP 連接,對每一個新進來的連接都會創建一個 SocketChannel。

緩衝區

發送給一個通道的所有數據都必須首先放到緩衝區中,同樣地,從通道中讀取的任何數據都要先讀到緩衝區中。也就是說,不會直接對通道進行讀寫數據,而是要先經過緩衝區。

緩衝區實質上是一個數組,但它不僅僅是一個數組。緩衝區提供了對數據的結構化訪問,而且還可以跟蹤系統的讀/寫進程。

Buffer有兩種工作模式:寫模式和讀模式。在讀模式下,應用程序只能從Buffer中讀取數據,不能進行寫操作。但是在寫模式下,應用程序是可以進行讀操作的,這就表示可能會出現臟讀的情況。所以一旦您決定要從Buffer中讀取數據,一定要將Buffer的狀態改為讀模式。

注意:ServerSocketChannel通道它只支持對OP_ACCEPT事件的監聽,所以它是不能直接進行網絡數據內容的讀寫的。所以ServerSocketChannel是沒有集成Buffer的。

緩衝區包括以下類型:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

 

 

可以用三個值指定緩衝區在任意時刻的狀態:

  • position
  • limit
  • capacity

Position

您可以回想一下,緩衝區實際上就是美化了的數組。在從通道讀取時,您將所讀取的數據放到底層的數組中。 position 變量跟蹤已經寫了多少數據。更準確地說,它指定了下一個字節將放到數組的哪一個元素中。因此,如果您從通道中讀三個字節到緩衝區中,那麼緩衝區的 position 將會設置為3,指向數組中第四個元素。

同樣,在寫入通道時,您是從緩衝區中獲取數據。 position 值跟蹤從緩衝區中獲取了多少數據。更準確地說,它指定下一個字節來自數組的哪一個元素。因此如果從緩衝區寫了5個字節到通道中,那麼緩衝區的 position 將被設置為5,指向數組的第六個元素。

Limit

limit 變量表明還有多少數據需要取出(在從緩衝區寫入通道時),或者還有多少空間可以放入數據(在從通道讀入緩衝區時)。

position 總是小於或者等於 limit

Capacity

緩衝區的 capacity 表明可以儲存在緩衝區中的最大數據容量。實際上,它指定了底層數組的大小 ― 或者至少是指定了准許我們使用的底層數組的容量。

limit 決不能大於 capacity

 

在實際操作數據時它們有如下關係圖:

① 新建一個大小為 8 個字節的緩衝區,此時 position 為 0,而 limit = capacity = 8。capacity 變量不會改變,下面的討論會忽略它。

② 從輸入通道中讀取 5 個字節數據寫入緩衝區中,此時 position 為 5,limit 保持不變。

③ 在將緩衝區的數據寫到輸出通道之前,需要先調用 flip() 方法,這個方法將 limit 設置為當前 position,並將 position 設置為 0。

④ 從緩衝區中取 4 個字節到輸出緩衝中,此時 position 設為 4。

⑤ 最後需要調用 clear() 方法來清空緩衝區,此時 position 和 limit 都被設置為最初位置。

 

文件複製 NIO 實例

以下展示了使用 NIO 快速複製文件的實例:

public static void fastCopy(String src, String dist) throws IOException {

    /* 獲得源文件的輸入字節流 */
    FileInputStream fin = new FileInputStream(src);

    /* 獲取輸入字節流的文件通道 */
    FileChannel fcin = fin.getChannel();

    /* 獲取目標文件的輸出字節流 */
    FileOutputStream fout = new FileOutputStream(dist);

    /* 獲取輸出字節流的文件通道 */
    FileChannel fcout = fout.getChannel();

    /* 為緩衝區分配 1024 個字節 */
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    while (true) {

        /* 從輸入通道中讀取數據到緩衝區中 */
        int r = fcin.read(buffer);

        /* read() 返回 -1 表示 EOF */
        if (r == -1) {
            break;
        }

        /* 切換讀寫 */
        buffer.flip();

        /* 把緩衝區的內容寫入輸出文件中 */
        fcout.write(buffer);

        /* 清空緩衝區 */
        buffer.clear();
    }
}

選擇器

NIO 常常被叫做非阻塞 IO,主要是因為 NIO 在網絡通信中的非阻塞特性被廣泛使用。

NIO 實現了 IO 多路復用中的 Reactor 模型,一個線程 Thread 使用一個選擇器 Selector 通過輪詢的方式去監聽多個通道 Channel 上的事件,從而讓一個線程就可以處理多個事件。

通過配置監聽的通道 Channel 為非阻塞,那麼當 Channel 上的 IO 事件還未到達時,就不會進入阻塞狀態一直等待,而是繼續輪詢其它 Channel,找到 IO 事件已經到達的 Channel 執行。

例如,當多個客戶端通過通道向服務端傳輸數據時,是通過 ByteBuffer 來傳輸,一個文件通過多次,從輸入通道中讀取 N 個字節數據寫入ByteBuffer,然後再將將緩衝區的數據寫到輸出通道,這個過程可以看成是不連續的,因為只有當緩衝區寫滿后,通過 buffer.flip() 切換成讀模式后,才開始向輸出通道寫入,所以當ByteBuffer還在寫入狀態時,服務器是不會等待這個通道的ByteBuffer寫滿,而是去處理其他客戶端Channel 為可讀的狀態,當然這個處理業務的工作可以開啟多線程來處理。

因為創建和切換線程的開銷很大,因此使用一個線程來處理多個事件而不是一個線程處理一個事件,對於 IO 密集型的應用具有很好地性能。

應該注意的是,只有套接字 Channel 才能配置為非阻塞,而 FileChannel 不能,為 FileChannel 配置非阻塞也沒有意義。

套接字 NIO 實例

package com.chenhao.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

import org.junit.Test;

/*
 * 一、使用 NIO 完成網絡通信的三個核心:
 * 
 * 1. 通道(Channel):負責連接
 *         
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 * 
 * 2. 緩衝區(Buffer):負責數據的存取
 * 
 * 3. 選擇器(Selector):是 SelectableChannel 的多路復用器。用於監控 SelectableChannel 的 IO 狀況
 * 
 */
public class TestNonBlockingNIO {
    
    //客戶端
    @Test
    public void client() throws IOException{
        //1. 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        //2. 切換非阻塞模式
        sChannel.configureBlocking(false);
        
        //3. 分配指定大小的緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //4. 發送數據給服務端
        Scanner scan = new Scanner(System.in);
        
        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        //5. 關閉通道
        sChannel.close();
    }

    //服務端
    @Test
    public void server() throws IOException{
        //1. 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        //2. 切換非阻塞模式
        ssChannel.configureBlocking(false);
        
        //3. 綁定連接
        ssChannel.bind(new InetSocketAddress(9898));
        
        //4. 獲取選擇器
        Selector selector = Selector.open();
        
        //5. 將通道註冊到選擇器上, 並且指定“監聽接收事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        //6. 輪詢式的獲取選擇器上已經“準備就緒”的事件
        //使用 select() 來監聽到達的事件,它會一直阻塞直到有至少一個事件到達。
        while(selector.select() > 0){
            
            //7. 獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)”
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            
            while(it.hasNext()){
                //8. 獲取準備“就緒”的是事件
                SelectionKey sk = it.next();
                
                //9. 判斷具體是什麼事件準備就緒
                if(sk.isAcceptable()){
                    //10. 若“接收就緒”,獲取客戶端連接
                    SocketChannel sChannel = ssChannel.accept();
                    
                    //11. 切換非阻塞模式
                    sChannel.configureBlocking(false);
                    
                    //12. 將該通道註冊到選擇器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13. 獲取當前選擇器上“讀就緒”狀態的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    
                    //14. 讀取數據
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    
                    int len = 0;
                    while((len = sChannel.read(buf)) > 0 ){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }
                
                //15. 取消選擇鍵 SelectionKey
                //每一個“事件關鍵字”被處理后都必須移除,否則下一次輪詢時,這個事件會被重複處理
                it.remove();
            }
        }
    }
}

NIO傳輸文件

服務器端代碼

public class Server {
    private ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        //使用Map保存每個連接,當OP_READ就緒時,根據key找到對應的文件對其進行寫入。若將其封裝成一個類,作為值保存,可以再上傳過程中显示進度等等
    Map<SelectionKey, FileChannel> fileMap = new HashMap<SelectionKey, FileChannel>();
    public static void main(String[] args) throws IOException{
        Server server = new Server();
        server.startServer();
    }
    public void startServer() throws IOException{
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(8888));
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服務器已開啟...");
        while (true) {
            int num = selector.select();
            if (num == 0) continue;
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isAcceptable()) {
                    ServerSocketChannel serverChannel1 = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = serverChannel1.accept();
                    if (socketChannel == null) continue;
                    socketChannel.configureBlocking(false);
                    SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_READ);
                    InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.getRemoteAddress();
                    File file = new File(remoteAddress.getHostName() + "_" + remoteAddress.getPort() + ".txt");
                    FileChannel fileChannel = new FileOutputStream(file).getChannel();
                    fileMap.put(key1, fileChannel);
                    System.out.println(socketChannel.getRemoteAddress() + "連接成功...");
                    writeToClient(socketChannel);
                }
                else if (key.isReadable()){
                    readData(key);
                }
                // NIO的特點只會累加,已選擇的鍵的集合不會刪除,ready集合會被清空
                // 只是臨時刪除已選擇鍵集合,當該鍵代表的通道上再次有感興趣的集合準備好之後,又會被select函數選中
                it.remove();
            }
        }
    }
    private void writeToClient(SocketChannel socketChannel) throws IOException {
        buffer.clear();
        buffer.put((socketChannel.getRemoteAddress() + "連接成功").getBytes());
        buffer.flip();
        socketChannel.write(buffer);
        buffer.clear();
    }
    private void readData(SelectionKey key) throws IOException  {
        FileChannel fileChannel = fileMap.get(key);
        buffer.clear();
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int num = 0;
        try {
            while ((num = socketChannel.read(buffer)) > 0) {
                buffer.flip();
                // 寫入文件
                fileChannel.write(buffer);
                buffer.clear();
                }
        } catch (IOException e) {
            key.cancel();
            e.printStackTrace();
            return;
        }
        // 調用close為-1 到達末尾
        if (num == -1) {
            fileChannel.close();
            System.out.println("上傳完畢");
            buffer.put((socketChannel.getRemoteAddress() + "上傳成功").getBytes());
            buffer.clear();
            socketChannel.write(buffer);
            key.cancel();
        }
    }
}

 

客戶端模擬三個客戶端同時向服務器發送文件

public class Client {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            // 模擬三個發端
            new Thread() {
                public void run() {
                    try {
                        SocketChannel socketChannel = SocketChannel.open();
                        socketChannel.socket().connect(new InetSocketAddress("127.0.0.1", 8888));
                        File file = new File("E:\\" + 11 + ".txt");
                        FileChannel fileChannel = new FileInputStream(file).getChannel();
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        socketChannel.read(buffer);
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, buffer.limit(), Charset.forName("utf-8")));
                        buffer.clear();
                        int num = 0;
                        while ((num=fileChannel.read(buffer)) > 0) {
                            buffer.flip();                        
                            socketChannel.write(buffer);
                            buffer.clear();
                        }
                        if (num == -1) {
                            fileChannel.close();
                            socketChannel.shutdownOutput();
                        }
                        // 接受服務器
                        socketChannel.read(buffer);
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, buffer.limit(), Charset.forName("utf-8")));
                        buffer.clear();
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    
                };
            }.start();
            
        }
        Thread.yield();
    }
}

可見這裏我們僅僅使用了一個線程就管理了三個連接,相比以前使用阻塞的Socket要在accept函數返回后開啟線程來管理這個連接,而使用NIO我們在accept返回后,僅僅將其註冊到選擇器上,讀操作在下次檢測到有可讀的鍵的集合時就會去處理。

NIO+線程池改進

public class ThreadPoolServer extends Server{
    private ExecutorService exec = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws IOException {
        ThreadPoolServer server = new ThreadPoolServer();
        server.startServer();
    }

    @Override
    protected void readData(final SelectionKey key) throws IOException {
        // 移除掉這個key的可讀事件,已經在線程池裡面處理,如果不改變當前Key的狀態,這裏交給另外一個線程去處理,主線程下一次遍歷此KEY還是可讀事件,會重複開啟線程處理任務
        key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
        exec.execute(new Runnable() {
            @Override
            public void run() {
                ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                FileChannel fileChannel = fileMap.get(key);
                buffer.clear();
                SocketChannel socketChannel = (SocketChannel) key.channel();
                int num = 0;
                try {
                    while ((num = socketChannel.read(buffer)) > 0) {
                        buffer.flip();
                        // 寫入文件
                        fileChannel.write(buffer);
                        buffer.clear();
                    }
                } catch (IOException e) {
                    key.cancel();
                    e.printStackTrace();
                    return;
                }
                // 調用close為-1 到達末尾
                if (num == -1) {
                    try {
                        fileChannel.close();
                        System.out.println("上傳完畢");
                        buffer.put((socketChannel.getRemoteAddress() + "上傳成功").getBytes());
                        buffer.clear();
                        socketChannel.write(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    // 只有調用cancel才會真正從已選擇的鍵的集合裏面移除,否則下次select的時候又能得到
                    // 一端close掉了,其對端仍然是可讀的,讀取得到EOF,返回-1
                    key.cancel(); 
                    return;
                }
                // Channel的read方法可能返回0,返回0並不一定代表讀取完了。
                // 工作線程結束對通道的讀取,需要再次更新鍵的ready集合,將感興趣的集合重新放在裏面
                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
                // 調用wakeup,使得選擇器上的第一個還沒有返回的選擇操作立即返回即重新select
                key.selector().wakeup();
            }
        });
    }
}

推薦博客

  程序員寫代碼之外,如何再賺一份工資?

多路復用IO的優缺點

  • 不用再使用多線程來進行IO處理了(包括操作系統內核IO管理模塊和應用程序進程而言)。當然實際業務的處理中,應用程序進程還是可以引入線程池技術的
  • 同一個端口可以處理多種協議,例如,使用ServerSocketChannel的服務器端口監聽,既可以處理TCP協議又可以處理UDP協議。
  • 操作系統級別的優化:多路復用IO技術可以是操作系統級別在一個端口上能夠同時接受多個客戶端的IO事件。同時具有之前我們講到的阻塞式同步IO和非阻塞式同步IO的所有特點。Selector的一部分作用更相當於“輪詢代理器”。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

您可能也會喜歡…