学习netty之前,必须先了解下什么是nio,关于nio的教程网上非常多,和传统io的比较,优势,如何使用,各个方面都有比较细致的分析。
而netty可以说是nio实现的最佳方式,相比你来说
1、封装了使用nio的复杂性,提供了较简的api供开发者使用
2、解决了nio编程中的bug与性能问题
3、提供了一些默认的网络通信模型与协议实现
4、活跃的社区与经过考验的用户
首先还是有必要对nio做个简单的了解,关于nio主要涉及到三个元素channel、buffer、selector,可以说对nio的使用也就是对这三个对象的方法使用:
通过一个例子简单理解如何使用:
//服务端
public class Server implements Runnable{
private boolean running = false;
private static int port = 9100;
private ServerSocketChannel channel;
private Selector selector;
private ByteBuffer buffer = ByteBuffer.allocate(128);
public Server(){
listener();
}
private void listener(){
try {
channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
running = true;
System.out.println(String.format("启动成功,监听端口:%s",port));
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while (true){
try {
int count = selector.select();//接入的客户端数量
if(count>0){
Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator();
while (keyIt.hasNext()){
SelectionKey key = keyIt.next();
if(key.isValid()){//此键是否有效
// System.out.println("valid");
}
if(key.isConnectable()){//此键的通道是否已完成其套接字连接操作
// System.out.println("connectable");
}
if(key.isAcceptable()){//此键的通道是否已准备好接受新的套接字连接
System.out.println("acceptable");
ServerSocketChannel _channel = (ServerSocketChannel)key.channel();//和上面定义channel一样
_channel.accept()//这个就是acceptable,获取连接的客户端
.configureBlocking(false)
.register(selector, SelectionKey.OP_READ );
}
//读的时候需要注意limit的位置(remaining()),而并不是整个buffer的内容(array())
if(key.isReadable()){//此键的通道是否准备读取
// System.out.println("readable");
buffer.flip();//写->读 l->p,p->0
SocketChannel _channel = (SocketChannel)key.channel();//客户端
//将buffer读到channel中
int len = _channel.read(buffer);//position、limit变化了 P->l
buffer.flip();//如果不调用remaining()
System.out.println(new String(buffer.array(),0,buffer.remaining()));//len
buffer.clear();
_channel.register(selector, SelectionKey.OP_WRITE);
}
//在写的时候,对于buffer经历了写(将数据写入buffer)、读(从buffer读取写入channel)
if(key.isWritable()){//此键的通道是否准备写入
// System.out.println("writable");
SocketChannel _channel =(SocketChannel)key.channel();//服务端
buffer.clear();//读->写,准备往buffer写入 p->0,l->c
String msg = "message :"+new Date();
buffer.put(msg.getBytes());
buffer.flip();//写->读,channel从buffer中读出 l->p,p->0
_channel.write(buffer);
_channel.register(selector, SelectionKey.OP_READ);
}
keyIt.remove();//需要主动删除,不然会重复消费
}
}
} catch (IOException e) {
// _channel.close();
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Thread app = new Thread(new Server());
app.start();
}
}
我们可以直接使用telnet localhost 9100,或者自己写个简单的客户端:
public class Client implements Runnable{
private SocketChannel clientChannel;
private Selector selector;
private ByteBuffer buffer = ByteBuffer.allocate(128);
public Client() throws IOException {
clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress("localhost",9100));
selector = Selector.open();
clientChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_CONNECT);
while (!clientChannel.finishConnect()){
}
System.out.println("连接成功");
}
public void write(String msg){
buffer.clear();
buffer.put(msg.getBytes());
buffer.flip();
try {
clientChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
while (true){
try {
selector.select();
Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator();
while (keyIt.hasNext()){
SelectionKey key = keyIt.next();
if(key.isConnectable()){
System.out.println("connected");
}
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
buffer.clear();
int len = channel.read(buffer);
buffer.flip();
System.out.println( "接收信息:"+ new String(buffer.array(),0,len));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
Client client = new Client();
new Thread(client).start();
Scanner scanner = new Scanner(System.in);
while (true){
client.write(scanner.nextLine());
}
}
}
主要通过示例理解开发的过程,以及上门说到的三个对象以及其衍生对象的使用方法,特别要注意下buffer的读写切换,比如我们在往channel中写入数据时,首先需要调用clear()方法,然后channel从buffer中读取数据,需要调用flip()将其转换成读模式。其次,在从buffer读取数据时,比如多次对buffer写入过,新的写入前调用的clear()并不是将buffer清空,只不过将其position与limit的位置进行了重设。
比如
CharBuffer buffer = CharBuffer.allocate(10);
buffer.put(new char[]{'1','2','3','4','5'});
buffer.clear();
buffer.put(new char[]{'a'});
//buffer里面的值其实是 a 1 2 3 4 5 但是读模式时 limit=1;不要用buffer.array(),那样是所有的
关于netty,相比而言使用比较简单,可以看到我们如果单纯的使用nio进行开发,需要关注很多细节的处理,同时还有性能的保证都是需要自己去处理的,特别是在网络通信中,各种协议的实现、通信保持、心跳检查、数据编码、拆包粘包、数据缓存等实现都需要开发者去完成,因此开发一套可靠性比较高的通信框架是非常复杂的。
netty既然封装了这些复杂度,那么我们如何去使用,还是通过示例去属性,毕竟关于netty的实现原理,通信模型网上有太多的解释,作为了解已经足够,通过示例学习,能更快了解该框架:
//服务端
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run() throws InterruptedException {
//1
EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
EventLoopGroup wookerLoopGroup = new NioEventLoopGroup();
//2
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3
serverBootstrap.group(bossLoopGroup, wookerLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) //设置TCP缓冲区
.option(ChannelOption.SO_SNDBUF, 10*1024) //发送缓冲
.option(ChannelOption.SO_RCVBUF, 10*1024) //接收缓冲
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
System.err.println("接收到的信息:" + new String(req));
ByteBuf resp = Unpooled.copiedBuffer("服务端返回信息".getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
//4
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
//5
wookerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
public static void main(String[] args) throws InterruptedException {
new Server(9093).run();
}
}
//客户端
public class Client {
private String url;
private int port;
public Client(String url,int port){
this.url = url;
this.port = port;
}
public void start() throws InterruptedException {
//1
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
//2
Bootstrap bootstrap = new Bootstrap();
//3
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
byte[] bs = new byte[buffer.readableBytes()];
buffer.readBytes(bs);
System.err.println("客户端接收的消息:"+new String(bs));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
//4
ChannelFuture future = bootstrap.connect(url, port);
//5
future.channel().writeAndFlush(Unpooled.copiedBuffer("客户端发送的信息!".getBytes()));
future.channel().closeFuture().sync();
//6
eventLoopGroup.shutdownGracefully();
}
public static void main(String[] args) throws InterruptedException {
Client client = new Client("localhost",9093);
client.start();
}
}