Part1
1
Netty——分隔符和定长解码器
TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用一下4种方式:
1.消息长度固定。累计读取到指定长度的消息后就认为读取了一个完整的消息;将计数器置位,重新开始读取下一个数据报
2.将回车符作为消息结束符。如FTP协议
3.将特殊的分隔符作为消息结束的标志。换行符就是一种特殊的结束分隔符
4.通过在消息头中长度字段来表示消息的总长度
Netty对上述4种方式提供了统一的抽象,提供4种解码器来解决对应的问。
2
解码器介绍
DelimiterBasedFrameDecoder:自动完成以分隔符作为标识符的消息接码
FixedLengthFrameDecoder:自动完成对定长消息的接码
Part2
1
DelimiterBasedFrameDecoder客户端
public class DelimiterBasedFrameDecoderEchoClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler());
}
});
//异步链接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待客户端
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1");
}
}
2
DelimiterBasedFrameDecoder客户端处理类
public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
private byte[] req;
public DelimiterBasedFrameDecoderEchoClientHandler() {
req = ("hello world" + "$_").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 读取并打印消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
3
DelimiterBasedFrameDecoder服务器端
public class DelimiterBasedFrameDecoderEchoServer {
public void bind(int port) {
//配置服务器端NIO线程组
//NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//功能类似于NIO中的ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的参数
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
//绑定事件的处理类ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//DelimiterBasedFrameDecoder解码器 $_ 作为分隔符
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//StringDecoder解码器
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler());
}
});
//绑定端口,同步等待绑定操作完成
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new DelimiterBasedFrameDecoderEchoServer().bind(port);
}
}
4
DelimiterBasedFrameDecoder服务器端处理类
public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println("服务器端第" + count.incrementAndGet() + "次收到消息:" + body);
ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + "$_").getBytes());
//并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//将消息发送队列中的消息写入SocketChannel中,发送到对方
//防止频繁的唤醒Selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//发生异常关闭ChannelHandlerContext等资源
ctx.close();
}
}
5
DelimiterBasedFrameDecoder执行结果
服务器端第1次收到消息:hello world
服务器端第2次收到消息:hello world
服务器端第3次收到消息:hello world
服务器端第4次收到消息:hello world
服务器端第5次收到消息:hello world
服务器端第6次收到消息:hello world
服务器端第7次收到消息:hello world
服务器端第8次收到消息:hello world
服务器端第9次收到消息:hello world
服务器端第10次收到消息:hello world
······
服务器端第99次收到消息:hello world
服务器端第100次收到消息:hello world
客户端第1次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第2次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第3次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第4次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第5次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第6次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第7次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第8次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第9次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第10次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
·····
客户端第98次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第99次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第100次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
Part3
1
FixedLengthFrameDecoder客户端
public class FixedLengthFrameDecoderEchoClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler());
}
});
//异步链接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待客户端
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1");
}
}
2
FixedLengthFrameDecoder客户端处理类
public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
private byte[] req;
public FixedLengthFrameDecoderEchoClientHandler() {
req = ("hello world").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 读取并打印消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
3
FixedLengthFrameDecoder服务器端
public class FixedLengthFrameDecoderEchoServer {
public void bind(int port) {
//配置服务器端NIO线程组
//NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//功能类似于NIO中的ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的参数
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
//绑定事件的处理类ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//FixedLengthFrameDecoder解码器
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11));
//StringDecoder解码器
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler());
}
});
//绑定端口,同步等待绑定操作完成
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new FixedLengthFrameDecoderEchoServer().bind(port);
}
}
4
FixedLengthFrameDecoder服务器端处理类
public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println("服务器端收到消息:" + body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//将消息发送队列中的消息写入SocketChannel中,发送到对方
//防止频繁的唤醒Selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//发生异常关闭ChannelHandlerContext等资源
ctx.close();
}
}
5
FixedLengthFrameDecoder测试结果
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
······
END