java · 20 2 月, 2022 0

Java NIO 代码实现详解

在前面文章中介绍了JAVA中BIO的实现方式以及优化 介绍了Java BIO代码实现以及常见的几种优化方式,在Java后期的迭代中,引入NIO相关的内容, 提高IO的处理效率,这篇文章主要介绍NIO的代码实现。

使用NIO实现BIO

在NIO中,我们也可以实现阻塞IO的功能,功能和BIO保持一致, 同时针对BIO的实现优化方法也是一致的。

package com.jdk.test.demo.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * 使用nio的代码实现bio
 */
public class NioToBioDemo {

    public static void main(String[] args) throws IOException {

        ServerSocketChannel channel = ServerSocketChannel.open();
        // 采用阻塞模式, 在等待客户端链接时,会产生阻塞
        channel.configureBlocking(true);
        // 绑定8080端口
        channel.bind(new InetSocketAddress(8080));
        System.out.println("服务端已经启动, 端口: 8080");

        // 循环等待接收客户端请求, 并将客户端的输入写出到channel
        while (true) {
            // 会一直阻塞,并等到客户端链接
            SocketChannel socketChannel = channel.accept();
            socketChannel.configureBlocking(true);
            printClientInfo(socketChannel);

            handle(socketChannel);
        }
    }

    private static void handle(SocketChannel socketChannel) throws IOException {
        // 从channel中读取数据
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        while (socketChannel.read(byteBuffer) != -1) {

            String msg = readData(byteBuffer);
            System.out.println("接收到客户端消息: " + msg);

            // 将消息写出到客户端
            byteBuffer.flip();
            while (byteBuffer.hasRemaining()) {
                socketChannel.write(byteBuffer);
            }

            byteBuffer.compact();
        }
    }

    private static void printClientInfo(SocketChannel channel) throws IOException {
        SocketAddress socketAddress = channel.getRemoteAddress();
        System.out.println("客户端已经链接: " + socketAddress.toString());
    }

    /**
     * 从channel中读取数据
     *
     * @param byteBuffer 字节缓冲对象
     * @return 客户端发送的数据
     * @throws IOException
     */
    private static String readData(ByteBuffer byteBuffer) throws IOException {
        byteBuffer.flip();
        byte[] bytes = new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);

        return new String(bytes);
    }
}

在以上的代码中,实现BIO的关键在于configureBlocking(true)方法的实现,该方法用于标记当前的channel的工作模式:

  • true: 表示blocking(阻塞)工作模式,也就是BIO的实现
  • false: 表示non-blocking(非阻塞)工作模式, 也是NIO的实现

因此针对以上的代码,如果要将BIO改为NIO的实现,其实就只是将configureBlocking(false)即可。

阻塞模式的工作,产生阻塞的点在那些地方呢?

  • 针对ServerSocketChannel而言,阻塞主要在等待客户端链接(accept())方法上
  • 针对SocketChannel而言,阻塞主要在从Socket读取数据(read())和写出数据(write())

NIO non-blocking实现方式

package com.jdk.test.demo.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * 使用nio的代码实现bio
 */
public class NioServerDemo {

    public static void main(String[] args) throws IOException {

        ServerSocketChannel channel = ServerSocketChannel.open();
        // 采用非阻塞模式
        channel.configureBlocking(false);
        // 绑定8080端口
        channel.bind(new InetSocketAddress(8080));
        System.out.println("服务端已经启动, 端口: 8080");

        // 循环等待接收客户端请求, 并将客户端的输入写出到channel
        while (true) {
            // 当处于non-blocking模式的时候, 该方法不会发生阻塞
            SocketChannel socketChannel = channel.accept();

            try {
                if (socketChannel != null) {
                    socketChannel.configureBlocking(false);
                    printClientInfo(socketChannel);

                    // 从channel中读取数据
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int data;
                    while ((data = socketChannel.read(byteBuffer)) > -1) {
                        if (data > 0) {
                            String msg = readData(byteBuffer);
                            System.out.println("接收到客户端消息: " + data + msg);

                            // 将消息写出到客户端
                            byteBuffer.flip();
                            // 异步写出
                            socketChannel.write(byteBuffer);
                            byteBuffer.clear();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                if (socketChannel != null) {
                    socketChannel.close();
                }
            }
        }
    }

    private static void printClientInfo(SocketChannel channel) throws IOException {
        SocketAddress socketAddress = channel.getRemoteAddress();
        System.out.println("客户端已经链接: " + socketAddress.toString());
    }

    /**
     * 从channel中读取数据
     *
     * @param byteBuffer 字节缓冲对象
     * @return 客户端发送的数据
     * @throws IOException
     */
    private static String readData(ByteBuffer byteBuffer) throws IOException {
        byteBuffer.flip();
        byte[] bytes = new byte[byteBuffer.limit()];
        byteBuffer.get(bytes);

        return new String(bytes);
    }
}

当前的实例为nio的non-blocking工作模式, 这个工作模式最大的优点在于取消的阻塞,通过自身的循环实现获取客户端链接,读入和写出数据。对于上面的代码中,有几个点需要注意:

  • 由于是non-blocking工作模式, accept()方法也不是阻塞的等待客户端链接,所以从accept()方法中获取的channel实际上可能为空的,所以需要有判空
  • 在从SocketChannel读取数据的时,也不会产生阻塞,如果没有读取到数据,返回的是0,因此需要判断是否读取到数据

从以上的代码可以分析得知,在non-blocking工作模式中,会不停的判断是否有新的连接,是否有数据读取。因此在链接较多,或者空闲时,都可能会导致CPU使用过高等问题。因此NIO中引入了Selector,可以管理多个Channel。

NIO Selector实现

package com.jdk.test.demo.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
 * 通过nio实现服务端处理客户端请求
 */
public class NioSelectorServerDemo {

    private static Map<SocketChannel, ByteBuffer> writeCache = new HashMap<>();

    public static void main(String[] args) {
        try {
            ServerSocketChannel socketChannel = ServerSocketChannel.open();

            // non-blocking模式
            socketChannel.configureBlocking(false);
            socketChannel.bind(new InetSocketAddress(8080));

            // 将channel与selector进行绑定
            Selector selector = Selector.open();

            int ops = socketChannel.validOps();
            socketChannel.register(selector, ops, null);

            while (true) {
                // 当前selector会阻塞, 直到有事件可以处理位置
                selector.select();

                // 从selector中获取可以处理的事件列表
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                Iterator<SelectionKey> iterator = selectionKeys.iterator();

                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();

                    // 说明当前的key是一个op_accept事件
                    if (selectionKey.isAcceptable()) {
                        SocketChannel channel = socketChannel.accept();
                        channel.configureBlocking(false);

                        System.out.println("客户端已连接: " + channel.getRemoteAddress().toString());
                        // 绑定op_read事件,等待从客户端读取数据
                        channel.register(selector, SelectionKey.OP_READ, null);
                    } else if (selectionKey.isReadable()) {
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        // 从channel中读取数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int data = channel.read(byteBuffer);

                        if (data > 0) {
                            String msg = new String(byteBuffer.array(), 0, data);
                            writeCache.put(channel, byteBuffer);
                            System.out.println("接收到客户端消息: " + msg);
                        }

                        byteBuffer.flip();
                        channel.write(byteBuffer);
                        byteBuffer.compact();

//                        channel.register(selector, SelectionKey.OP_WRITE, null);
                    }

                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在上面的实现中,可以看出,本身的NIO ServerSocketChannel还是non-blocking的工作模式,只是本身不需要去判断是否有客户端链接,而是将实现交给了Selector, Selector通过系统的epoll的实现,获取OP_ACCEPT, OP_READ, OP_WRITE事件的Channel, 这样对先前的两个实现有了比较大的提升,主要包含一下方面:

  • Selector同时可以管理多个channel, 并且返回已经能够处理的channel列表,避免CPU使用率过高等问题
  • 通过事件响应机制,避免了对多个channel扫描的流程,能够精确的处理需要处理的数据流,提高效率

因此通过selector管理多个channel, 可以提高服务端的整体性能,后面将介绍selector是如何实现,如果文章有帮助到你,请问文章点赞!!!