java.nio核心类简介
Channel: 通信通道,用来维护双方间的数据通信
Selector: 选择器,感知对方到本地通信数据的变化,并将通道所对应的key返回
SelectionKey: 通道往选择器中注册成功之后返回的key
相关API
socketChannel.write(writeBuffer):
给予对方响应并写入数据。
Selector.open():
开启一个选择器。
ServerSocketChannel.open():
开启serverSocketChannel
channel.register(selector, SelectionKey.OP_ACCEPT):
向注册器中注册通道。
channel.register(selector, SelectionKey.OP_READ, new ServerConnectionAttach()):
向注册器中注册通道,并绑定一个附件对象到返回的key上,该附件对象的生命周期直到附件从key上被替换。
key.channel():
通过key获取注册器中的注册的channel。
selector.keys():
返回注册器中的所有key,不管该key是否已准备好。
selector.select():
该方法可能会挂起,仅有在注册器中存在准备好的通道时返回,并将已取消的key从对应的键集中删除。
selector.selectedKeys():
返回已经被选择的key,即对应通道已经在对应操作位准备就绪key。已选择操作集除了其迭代器的remove方法,其他任何修改该操作集的方法均会抛出异常。
注:注册动作只与选择器和通道有关,与操作位无关,如果重复注册,会返回已注册的key,并更改操作位。
选择器注册的操作位
SelectionKey#OP_READ:
标示通道中的数据可读时。
SelectionKey#OP_WRITE:
标示通道可写时。
SelectionKey#OP_CONNECT:
标示通道已连接时。
SelectionKey#OP_ACCEPT:
标示通道已接受。
DEMO
客户端
package org.eddy.io;
import lombok.Getter;
import lombok.Setter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by eddy on 2017/5/20.
*/
@Getter
@Setter
public class Client implements Runnable{
private Selector selector;
private SocketChannel sc;
private AtomicInteger atomicInteger = new AtomicInteger(0);
{
try {
selector = Selector.open();
sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 8081));
sc.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
if (atomicInteger.get() >= 3) {
System.out.println("end client");
break;
}
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()) {
handConnect(key);
} else if (key.isWritable()) {
handWrite(key);
} else if (key.isReadable()) {
handRead(key);
atomicInteger.incrementAndGet();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handWrite(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
sc.write(ByteBuffer.wrap("send to server".getBytes("UTF-8")));
key.interestOps(SelectionKey.OP_READ);
}
private void handRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 1K
sc.read(byteBuffer);
byteBuffer.flip();
byte[] data = new byte[byteBuffer.limit()];
byteBuffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
System.out.println("client read");
System.out.println(new String(data, "UTF-8"));
}
private void handConnect(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
sc.finishConnect();
sc.write(ByteBuffer.wrap("client send to server\n".getBytes("UTF-8")));
sc.register(selector, SelectionKey.OP_WRITE);
System.out.println("connect to server");
}
public static void main(String[] args) {
Client client = new Client();
Executors.newSingleThreadExecutor().submit(client);
}
}
服务端
package org.eddy.io;
import lombok.Getter;
import lombok.Setter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by eddy on 2017/5/20.
*/
@Getter
@Setter
public class Server implements Runnable {
private ServerSocketChannel ssc;
private Selector selector;
private AtomicInteger atomicInteger = new AtomicInteger(0);
{
try {
selector = Selector.open();
ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8081), 500);
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
if (atomicInteger.get() >= 3) {
System.out.println("end server");
break;
}
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
handAccept(key);
} else if (key.isWritable()) {
handWrite(key);
atomicInteger.incrementAndGet();
} else if (key.isReadable()) {
handRead(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 1k
sc.read(byteBuffer);
byteBuffer.flip();
byte[] data = new byte[byteBuffer.limit()];
byteBuffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
System.out.println("server read");
System.out.println(new String(data, "UTF-8"));
}
private void handWrite(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
sc.write(ByteBuffer.wrap("server send to client\n".getBytes("UTF-8")));
key.interestOps(SelectionKey.OP_READ);
}
private void handAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("accept client");
}
public static void main(String[] args) {
Server server = new Server();
Executors.newSingleThreadExecutor().submit(server);
}
}