简介
什么是 Netty?让我们带着问题来跟着官网的 Demo 教程先入个门。
Netty 是异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
dependencies {
implementation "io.netty:netty-all:4.1.56.Final"
}
世界上最简单的协议实现不是发送
Hello World
消息,被服务器接受到返回相应的响应结果。而是服务器接收到消息后直接丢弃,不做任何响应。
要实现
DISCARD
协议,您需要做的唯一一件事就是忽略所有接收到的数据。让我们直接从处理程序实现开始,它处理 Netty 生成的I/O
事件。
// [1]
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
// [2]
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
// 不处理消息,直接释放
// [3]
((ByteBuf) msg).release();
}
// [4]
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
ChannelInboundHandlerAdapter
实现了接口 ChannelInboundHandler
。充当适配器的角色提供了各种可以重写的事件处理程序方法,通过适配器的标准实现方式,可以避免我们自己实现处理程序接口。channelRead()
的事件处理器方法。只要从客户机接收到新数据,就会使用接收到的消息调用此方法。DISCARD
协议,处理程序必须忽略接收到的消息。ByteBuf
是一个引用计数的对象,必须通过release()
方法来进行释放。通常的事项方式是这样的// [2]
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
// 不处理消息,直接释放
// [3]
//((ByteBuf) msg).release();
try {
// 针对消息 msg 进行处理
} finally {
// 释放引用
ReferenceCountUtil.release(msg);
}
}
exceptionCaught()
作为异常处理,当 Netty 由于I/O
错误或处理程序实现由于处理事件时抛出的异常而引发异常时,使用 Throwable 调用事件处理程序方法。在大多数情况下,被捕获的异常应该被记录,其相关的通道应该在这里关闭,尽管这个方法的实现可以根据您想要处理的异常情况而有所不同。例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。启动服务器
public class DiscardServer {
/**
* 端口
*/
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
// [1]
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// [2]
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
// [3]
.channel(NioServerSocketChannel.class)
// [4]
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
// [5]
.option(ChannelOption.SO_BACKLOG, 128)
// [6]
.childOption(ChannelOption.SO_KEEPALIVE, true);
// [7]
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
NioEventLoopGroup
是一个处理I/O
操作的多线程事件循环的处理器定义。例子中定义了2个处理器:ServerBootstrap
是服务器构造的辅助类,一般不推荐此方式进行服务器的创建。NioServerSocketChannel
类,用于实例化一个新的Channel
来接受传入的连接。Channel
,ChannelInitializer
作为特殊的处理程序,用于帮助用户配置新的Channel
。往往适用于为新的Channel
添加一些处理程序来实现更为复杂的应用程序。option
参数设置,支持设置特定的套接字选项。来满足特定的协议需求,如.option(ChannelOption.TCP_NODELAY, true)
来编写TCP/IP 服务协议。childOption
与option
不同之处在于:option
适用于NioServerSocketChannel
来接受传入的连接。childOption
适用于被父级的ServerChannel
接受的Channels
。模拟通信
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
// 针对消息 msg 进行处理
while (in.isReadable()) { // [4]
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
}
telnet localhost 8080
,进行通信。目前为止,我们只接受但是没有任何响应。一台服务器,通常应该响应该请求。让我们学习如何通过实现
ECHO
协议向客户端写入响应消息,其中任何接收到的数据都被发送回来。 与前面部分实现的丢弃服务器的唯一区别在于它将接收到的数据发回,而不是将接收的数据输出到控制台。因此,再次修改channelRead()
方法是足够的: 参考地址:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
public class EchoServer {
/**
* 端口
*/
private int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new EchoServer(port).run();
}
}
通过终端输入telnet localhost 8080
后输入英文字符会得到响应,原字符返回。如依次输入abc
,终端打印结果:
aabbcc
接下来要实现的协议是 TIME 协议。它不同于前面的示例,因为它发送包含32位整数的消息,而不接收任何请求,并在消息发送后关闭连接。在本例中,您将学习如何构造和发送消息,以及如何在完成时关闭连接。
因为我们将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以这次不能使用 channelRead() 方法。相反,我们应该重写 channelActive()方法。代码如下:
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
final ByteBuf time = ctx.alloc().buffer(4); // [2]
time.writeInt(89); //ASCII 10进制,对应 Y
time.writeInt(105); //ASCII 10进制,对应 i
final ChannelFuture f = ctx.writeAndFlush(time); // [3]
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // [4]
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
如前所述,当建立连接并准备生成通信量时,将调用 channelActive ()方法。让我们编写一个32位的整数,它表示此方法中的当前时间。
要发送一个新消息,我们需要分配一个新的缓冲区,其中将包含消息。我们要写一个32位的整数,因此我们需要一个容量至少为4字节的 ByteBuf。通过 ChannelHandlerContext.alloc ()获取当前 ByteBufAllocator 并分配一个新缓冲区。
像往常一样,我们写入一条构造好的消息。但是,等等,哪里冒险了?我们以前不是叫 java.nio 吗。在 NIO 中发送消息之前使用 ByteBuffer.flip () ?ByteBuf 没有这样的方法,因为它有两个指针: 一个用于读操作,另一个用于写操作。当您将某些内容写入 ByteBuf 而读取器索引不变时,写入器索引会增加。读者索引和写者索引分别表示消息的开始和结束位置。
I/O
操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在 Netty 都是异步的。例如,下面的代码可能会在发送消息之前关闭连接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
那么,当写请求完成时,我们如何得到通知呢?这很简单,可以添加一个ChannelFutureListener
来监听返回的结果ChannelFuture
。在这里,我们创建了一个新的匿名通道 ChannelFutureListener
,当操作完成时它会关闭通道。
f.addListener(ChannelFutureListener.CLOSE);
telnet localhost 8080
命令,终端在连接上后,打印消息后直接失去连接:Yi
遗失对主机的连接。
与 DISCARD 和 ECHO 服务器不同,我们需要 TIME 协议的客户端,因为人不能将32位二进制数据转换为日历上的日期。在本节中,我们将讨论如何确保服务器正常工作,并学习如何使用 Netty 编写客户机。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // [1]
final ByteBuf time = ctx.alloc().buffer(4); // (2)
// 2208988800为1900年1月1日00:00:00~1970年1月1日00:00:00的总秒数
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
public class TimeServer {
/**
* 端口
*/
private int port;
public TimeServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 在 Netty,服务器和客户机之间最大也是唯一的区别是使用了不同的 Bootstrap 和 Channel 实现。请看下面的代码:
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = bootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TimeServer(port).run();
}
}
客户端接收到响应打印结果:
Tue Dec 29 12:01:58 CST 2020
在基于流的传输(如 TCP/IP)中,接收到的数据被存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是一个包队列,而是一个字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将其视为两条消息,而只是将其视为一堆字节。因此,不能保证您所读到的内容与远程对等方所写的内容完全一致。
例如,假设一个操作系统的 TCP/IP 协议栈已经接收了三个数据包:
1
由于基于流的协议的这个一般属性,在应用程序中很有可能以下面的碎片形式读取它们:
2
因此,接收部分,无论是服务器端还是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,应用程序逻辑可以很容易地理解这些帧。在上面的例子中,接收到的数据应该如下所示:
3
现在让我们回到 TIME 客户端示例。我们这里也有同样的问题。32位整数是一个非常小的数据量,它不太可能经常被分段。然而,问题在于它可能是支离破碎的,并且随着流量的增加,支离破碎的可能性也会增加。
最简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节都被接收到内部缓冲区。以下是修改后的 TimeClientHandler 实现,它解决了这个问题:
public class TimeClientWithBufferHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buff;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
buff = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
buff.release();
buff = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf) msg; // (1)
buff.writeBytes(m);
m.release();
if (buff.readableBytes() >= 4) {
long currentTimeMillis = (buff.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
handlerAdded()
and 及handlerRemoved()
;buff
;buff
有足够的数据,在这个例子中是4个字节,然后继续进行实际的业务逻辑,当更多的数据到达时,这个函数会重新调用一个方法,最终所有的4个字节都会被累积;4
尽管第一个解决方案已经解决了 TIME 客户机的问题,但是修改后的处理程序看起来并不那么干净。想象一个更复杂的协议,它由多个字段组成,比如一个可变长度的字段。您的 ChannelInboundHandler 实现将很快变得不可维护。
正如您可能已经注意到的,您可以向 ChannelPipeline 添加多个 ChannelHandler,因此,您可以将一个单片 ChannelHandler 分割为多个模块化的 ChannelHandler,以降低应用程序的复杂性。例如,你可以将 TimeClientHandler 分成两个处理器:
TimeDecoder
处理碎片化问题TimeClientHandler
幸运的是,Netty 提供了一个可扩展的类,可以帮助你写出第一个开箱即用的类:
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
public class TimeClientWithDecoder {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new TimeDecoder()) // (5)
.addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
ByteToMessageDecoder
使得处理分裂问题变得容易;ByteToMessageDecoder
利用内部维护的累积缓冲区,调用decode方法来处理新数据;ByteToMessageDecoder
什么都不会添加到out
缓冲区中。当收到更多的数据时会再次调用decode()
;decode()
将一个数据添加到out
, 这意味着解码器成功解码了一条信息,将丢弃累积缓冲区的读取部分。请记住,您不需要解码多个消息,ByteToMessageDecoder
将继续调用方法,直到它没什么数据可以放入out
了;ChannelPipeline
添加处理程序TimeDecoder
来实现数据的分解。还可以通过以下方式进一步简化解码器:
public class TimeWithReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readBytes(4));
}
}
// 同样的,别忘了在 ChannelPipeline 中添加相应的处理程序
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast(new TimeDecoder())
.addLast(new TimeWithReplayingDecoder())
.addLast(new TimeClientHandler());
}
});
此外,Netty 提供了开箱即用的解码器,使您能够非常容易地实现大多数协议,并帮助您避免最终得到一个不可维护的整体处理程序实现。更详细的例子请参考以下软件包:
到目前为止,我们讨论的所有示例都使用 ByteBuf 作为协议消息的主要数据结构。实际的网络通信过程远比上面的时间协议实现的要更复杂,功能也要更加强大,比如我们常用的 Json 序列化传输,如果用 Netty,能否直接传输对象呢?
在 ChannelHandlers 中使用 POJO 的优势是显而易见的;通过分离从处理程序中提取 ByteBuf 信息的代码,您的处理程序变得更加可维护和可重用。在 TIME 协议的客户端和服务器示例中,我们只读取一个32位整数,直接使用 Bytebuf 并不是一个主要问题。但是,您会发现在实现现实世界的协议时有必要进行分离。
首先,我们将我们要传输的时间戳封装成一个简单对象:
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
增加解码器:
public class TimeDecoderWithPojo extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
//out.add(in.readBytes(4)); // (4)
out.add(new UnixTime(in.readUnsignedInt())); // (4)
}
}
增加处理器:
public class TimeClientHandlerWithPojo extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
和前面一样,设置客户端的处理器:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast(new TimeDecoder())
.addLast(new TimeDecoderWithPojo())
.addLast(new TimeClientHandlerWithPojo());
}
});
响应结果如下:
通过更新的解码器,``TimeClientHandler 不再使用 ByteBuf。
更简单和优雅,对不对?同样的技术也可以应用于服务器端。
首先是消息处理器,负责发送一个时间戳数据作为响应结果:
public class TimeServerHandlerWithPojo extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UnixTime unixTime = new UnixTime();
System.out.println("准备发送:"+ unixTime);
final ChannelFuture f = ctx.writeAndFlush(unixTime);
f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
// 当异常发生的时候关闭连接
cause.printStackTrace();
ctx.close();
}
}
然后是编码处理器,将Pojo
转换为ByteBuf
进行传输:
public class TimeServerEncoderHandlerWithPojo extends ChannelOutboundHandlerAdapter {
// 它是 ChannelOutboundHandler 的一个实现,它将 UnixTime 转换回 ByteBuf。这比编写解码器要简单得多,因为在对消息进行编码时不需要处理数据包碎片和汇编。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); // (1)
// 首先,我们传递原始的 ChannelPromise as-is,这样当编码的数据实际写入到连线时,Netty 将其标记为成功或失败。
// 其次,我们没有调用 ctx.flush ()。有一个单独的处理程序方法 void flush (ChannelHandlerContext ctx) ,它旨在重写 flush ()操作。
}
}
最后是服务端ChannelPipeline
程序设置:
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new TimeServerEncoderHandlerWithPojo())
.addLast(new TimeServerHandlerWithPojo());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
服务端接收到请求发送数据:
客户端接收到请求的响应数据:
7
同样的,Netty 也为服务端的消息编码定义了很多拆箱即用的工具类:
public class TimeServerMessageToByteEncoderHandler extends MessageToByteEncoder<UnixTime> {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int) m.value());
ctx.write(encoded, promise); // (1)
}
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
out.writeInt((int) msg.value());
}
}
// ChannelPipeline 设置
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//.addLast(new TimeServerEncoderHandlerWithPojo())
.addLast(new TimeServerMessageToByteEncoderHandler())
.addLast(new TimeServerHandlerWithPojo());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
关闭 Netty 应用程序通常非常简单,只需关闭通过 shutdownly() 创建的所有 EventLoopGroups 即可。它返回一个 Future,当 EventLoopGroup 完全终止并且属于该组的所有通道都已关闭时,它会通知您。(前文示例已演示多次,此处不再赘述。)
源码:https://gitee.com/zacsnz/architectrue-adventure/tree/master/netty-examples/netty-chapter-1
Netty 作为高性能的异步通信框架,提供了很多很多好用的 API。