• Home
  • Articles
    • 日志
    • 妍小言
    • 舒小书
    • 浩然说
    • 生活日记
  • All Tags

NIO基础相关

20 May 2017

Reading time ~2 minutes

java.nio核心类简介

Channel: 通信通道,用来维护双方间的数据通信
Selector: 选择器,感知对方到本地通信数据的变化,并将通道所对应的key返回
SelectionKey: 通道往选择器中注册成功之后返回的key

相关API

socketChannel.write(writeBuffer):给予对方响应并写入数据。
Selector.open():开启一个选择器。
ServerSocketChannel.open(): 开启serverSocketChannel
channel.register(selector, SelectionKey.OP_ACCEPT):向注册器中注册通道。
channel.register(selector, SelectionKey.OP_READ, new ServerConnectionAttach()):向注册器中注册通道,并绑定一个附件对象到返回的key上,该附件对象的生命周期直到附件从key上被替换。
key.channel():通过key获取注册器中的注册的channel。
selector.keys():返回注册器中的所有key,不管该key是否已准备好。
selector.select():该方法可能会挂起,仅有在注册器中存在准备好的通道时返回,并将已取消的key从对应的键集中删除。
selector.selectedKeys():返回已经被选择的key,即对应通道已经在对应操作位准备就绪key。已选择操作集除了其迭代器的remove方法,其他任何修改该操作集的方法均会抛出异常。

注:注册动作只与选择器和通道有关,与操作位无关,如果重复注册,会返回已注册的key,并更改操作位。

选择器注册的操作位

SelectionKey#OP_READ:标示通道中的数据可读时。
SelectionKey#OP_WRITE:标示通道可写时。
SelectionKey#OP_CONNECT:标示通道已连接时。
SelectionKey#OP_ACCEPT:标示通道已接受。

DEMO

客户端

package org.eddy.io;

import lombok.Getter;
import lombok.Setter;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by eddy on 2017/5/20.
 */
@Getter
@Setter
public class Client implements Runnable{

    private Selector selector;
    private SocketChannel sc;
    private AtomicInteger atomicInteger = new AtomicInteger(0);

    {
        try {
            selector = Selector.open();
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.connect(new InetSocketAddress("127.0.0.1", 8081));
            sc.register(selector, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (atomicInteger.get() >= 3) {
                    System.out.println("end client");
                    break;
                }
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isConnectable()) {
                        handConnect(key);
                    } else if (key.isWritable()) {
                        handWrite(key);
                    } else if (key.isReadable()) {
                        handRead(key);
                        atomicInteger.incrementAndGet();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void handWrite(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        sc.write(ByteBuffer.wrap("send to server".getBytes("UTF-8")));
        key.interestOps(SelectionKey.OP_READ);
    }

    private void handRead(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 1K
        sc.read(byteBuffer);
        byteBuffer.flip();
        byte[] data = new byte[byteBuffer.limit()];
        byteBuffer.get(data);
        key.interestOps(SelectionKey.OP_WRITE);
        System.out.println("client read");
        System.out.println(new String(data, "UTF-8"));
    }

    private void handConnect(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        sc.finishConnect();
        sc.write(ByteBuffer.wrap("client send to server\n".getBytes("UTF-8")));
        sc.register(selector, SelectionKey.OP_WRITE);
        System.out.println("connect to server");
    }

    public static void main(String[] args) {
        Client client = new Client();
        Executors.newSingleThreadExecutor().submit(client);
    }
}

服务端

package org.eddy.io;

import lombok.Getter;
import lombok.Setter;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by eddy on 2017/5/20.
 */
@Getter
@Setter
public class Server implements Runnable {

    private ServerSocketChannel ssc;
    private Selector selector;
    private AtomicInteger atomicInteger = new AtomicInteger(0);

    {
        try {
            selector = Selector.open();
            ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8081), 500);
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (atomicInteger.get() >= 3) {
                    System.out.println("end server");
                    break;
                }
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isAcceptable()) {
                        handAccept(key);
                    } else if (key.isWritable()) {
                        handWrite(key);
                        atomicInteger.incrementAndGet();
                    } else if (key.isReadable()) {
                        handRead(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void handRead(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 1k
        sc.read(byteBuffer);
        byteBuffer.flip();
        byte[] data = new byte[byteBuffer.limit()];
        byteBuffer.get(data);
        key.interestOps(SelectionKey.OP_WRITE);
        System.out.println("server read");
        System.out.println(new String(data, "UTF-8"));

    }

    private void handWrite(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        sc.write(ByteBuffer.wrap("server send to client\n".getBytes("UTF-8")));
        key.interestOps(SelectionKey.OP_READ);
    }

    private void handAccept(SelectionKey key) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel sc = ssc.accept();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ);
        System.out.println("accept client");
    }

    public static void main(String[] args) {
        Server server = new Server();
        Executors.newSingleThreadExecutor().submit(server);
    }
}



NIO