1.缓冲区
缓冲区Buffer是一个对象,它包含要写入和读出的数据。在NIO中加入Buffer对象是与BIO的一个重要区别。NIO中所有数据都是通过缓冲区处理的。缓冲区实质上是一个数组,缓冲区提供对数据的结构化访问和维护读写位置等信息。
ByteBuffer:字节缓冲区
CharBuffer:字符缓冲区
ShortBuffer:短整型缓冲区
IntBuffer:整型缓冲区
LongBuffer:长整型缓冲区
FloatBuffer:浮点型缓冲区
DoubleBuffer:双精度浮点型缓冲区
Boolean型没有对应的缓冲区
这些缓冲区都继承自Buffer
2.Channel
Channel是一个通道,网络数据通过Channel进行读写。
通道与流的区别:
流是一个方向上的移动(输入流或者输出流)
通道是双向的,通道可读可写并可以同时进行
Channel可以分为两大类:
SelectableChannel:用于网络读写
FileChannel:用于文件操作
3.多路复用器
多路复用器提供选择已经就绪的任务的能力。Selector会轮询注册在其上的Channel,如果某个Channel发生了读写时间,这个Channel就处于就绪态,就会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel集合,进行后续的IO操作。
一个Selector可以同时轮询多个Channel,JDK只用epoll代替传统的select实现,所以只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
4.NIO服务端时序图
Netty的IO线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端Channel,由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁IO阻塞导致的线程挂起。另外,由于Netty采用了异步通信模式,一个IO线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞IO一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
下面用netty实现一个客户端和服务端之间的通信
public class NioServer {
public static void main(String[] args) {
//多路复用类
NioServerTask nioServerTask = new NioServerTask(8888);
//单独的线程维护多路复用器
new Thread(nioServerTask).start();
}
}
这个main函数中主要的逻辑就是new一个线程,在线程中提交一个任务。看下NioServerTask代码如下:
public class NioServerTask implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
/**
* 初始化多路复用器,绑定端口
* @param port
*/
public NioServerTask(int port) {
try{
//多路复用器
selector = Selector.open();
//打开ServerSocketChannel, 监听客户端连接
serverSocketChannel = ServerSocketChannel.open();
//非阻塞模式
serverSocketChannel.configureBlocking(false);
//监听端口
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
// ServerSocketChannel注册到多路复用器Selector上,监听ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动端口:" + port);
} catch (Exception e) {
e.printStackTrace();
}
}
public void stop () {
this.stop = true;
}
@Override
public void run() {
//轮询就绪的key
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key = null;
while (iterator.hasNext()) {
key = iterator.next();
iterator.remove();
try {
//处理key
handleKey(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 处理key
* @param key
* @throws IOException
*/
private void handleKey(SelectionKey key) throws IOException {
if (key.isValid()) {
//监听到有新的客户端接入
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//完成TCP三次握手,TCP物理链接建立
SocketChannel socketChannel = serverSocketChannel.accept();
//客户端设置为非阻塞
socketChannel.configureBlocking(false);
//新的客户端注册到多路复用器Selector上,监听读操作,读取客户端发送的消息
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 读取数据
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
//1M的缓冲区
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//读取客户端请求到缓冲区
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
//当前缓冲区limit设置为position,position设置为0,便于后续对缓冲区的读取
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes);
System.out.println("获取客户端输入:" + body);
doWrite(socketChannel, "当前时间是:" + new Date());
} else if (readBytes < 0) {
//关闭
key.cancel();
socketChannel.close();
}
}
}
}
/**
* 发送响应消息
* @param socketChannel
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel socketChannel, String response) throws IOException {
if (!StringUtils.isEmpty(response)) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
}
对应的Nio客户端代码如下:
public class NioClient {
public static void main(String[] args) {
NioClientTask nioClientTask = new NioClientTask("127.0.0.1", 8888);
new Thread(nioClientTask).start();
}
}
与服务端一样,main函数这里也没有什么逻辑,还是看看NioClientTask的代码如下:
public class NioClientTask implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public NioClientTask(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
//打开SocketChannel
socketChannel = SocketChannel.open();
//设置SocketChannel为非阻塞
socketChannel.configureBlocking(false);
} catch (Exception e) {
}
}
@Override
public void run() {
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
SelectionKey selectionKey = null;
while (iterator.hasNext()) {
selectionKey = iterator.next();
iterator.remove();
try {
handle(selectionKey);
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
if (selectionKey.channel() != null) {
selectionKey.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handle(SelectionKey selectionKey) throws IOException {
if (selectionKey.isValid()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//是否连接成功
if (selectionKey.isConnectable()) {
if (socketChannel.finishConnect()) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
System.exit(1);
}
}
}
if (selectionKey.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes);
System.out.println(body);
} else if (readBytes < 0) {
selectionKey.channel();
socketChannel.close();
} else {
//忽略
}
}
}
/**
* 连接服务端
* @throws IOException
*/
private void connect() throws IOException {
//如果连接成功,注册到多路复用器上,发送请求
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
//不代表连接失败,等待服务端返回syn-ack
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
/**
* 写入消息
* @param socketChannel
* @throws IOException
*/
private void doWrite(SocketChannel socketChannel) throws IOException {
byte[] req = "hello world".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
这样就完成了一个简单的NIO通讯的客户端和服务端。
服务端输入如下:
服务器启动端口:8888
获取客户端输入:hello world
客户端输出如下:
当前时间是:Wed Sep 05 17:11:07 CST 2018