前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty解码器

Netty解码器

作者头像
黑洞代码
发布2021-01-14 15:38:12
7320
发布2021-01-14 15:38:12
举报
文章被收录于专栏:落叶飞翔的蜗牛

Part1

1

Netty——分隔符和定长解码器

TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用一下4种方式:

1.消息长度固定。累计读取到指定长度的消息后就认为读取了一个完整的消息;将计数器置位,重新开始读取下一个数据报

2.将回车符作为消息结束符。如FTP协议

3.将特殊的分隔符作为消息结束的标志。换行符就是一种特殊的结束分隔符

4.通过在消息头中长度字段来表示消息的总长度

Netty对上述4种方式提供了统一的抽象,提供4种解码器来解决对应的问。

2

解码器介绍

DelimiterBasedFrameDecoder:自动完成以分隔符作为标识符的消息接码

FixedLengthFrameDecoder:自动完成对定长消息的接码

Part2

1

DelimiterBasedFrameDecoder客户端

代码语言:javascript
复制
public class DelimiterBasedFrameDecoderEchoClient {

    public void connect(int port, String host) {
        try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler());
                        }
                    });
            //异步链接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //等待客户端
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1");
    }
}

2

DelimiterBasedFrameDecoder客户端处理类

代码语言:javascript
复制
public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
    private AtomicInteger count = new AtomicInteger(0);
    private byte[] req;

    public DelimiterBasedFrameDecoderEchoClientHandler() {
        req = ("hello world" + "$_").getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 读取并打印消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

3

DelimiterBasedFrameDecoder服务器端

代码语言:javascript
复制
public class DelimiterBasedFrameDecoderEchoServer {

    public void bind(int port) {
        //配置服务器端NIO线程组
        //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
        try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
            //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //功能类似于NIO中的ServerSocketChannel
            bootstrap.group(bossLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //配置NioServerSocketChannel的参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //绑定事件的处理类ChildChannelHandler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            //DelimiterBasedFrameDecoder解码器 $_ 作为分隔符
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                            //StringDecoder解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler());
                        }
                    });
            //绑定端口,同步等待绑定操作完成
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            //等待服务器监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new DelimiterBasedFrameDecoderEchoServer().bind(port);
    }
}

4

DelimiterBasedFrameDecoder服务器端处理类

代码语言:javascript
复制
public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {

    private AtomicInteger count = new AtomicInteger(0);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String body = (String) msg;
        System.out.println("服务器端第" + count.incrementAndGet() + "次收到消息:" + body);
        ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + "$_").getBytes());
        //并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息发送队列中的消息写入SocketChannel中,发送到对方
        //防止频繁的唤醒Selector进行消息发送
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //发生异常关闭ChannelHandlerContext等资源
        ctx.close();
    }
}

5

DelimiterBasedFrameDecoder执行结果

代码语言:javascript
复制
服务器端第1次收到消息:hello world
服务器端第2次收到消息:hello world
服务器端第3次收到消息:hello world
服务器端第4次收到消息:hello world
服务器端第5次收到消息:hello world
服务器端第6次收到消息:hello world
服务器端第7次收到消息:hello world
服务器端第8次收到消息:hello world
服务器端第9次收到消息:hello world
服务器端第10次收到消息:hello world
······
服务器端第99次收到消息:hello world
服务器端第100次收到消息:hello world

客户端第1次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第2次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第3次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第4次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第5次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第6次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第7次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第8次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第9次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第10次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
·····
客户端第98次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第99次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第100次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018

Part3

1

FixedLengthFrameDecoder客户端

代码语言:javascript
复制
public class FixedLengthFrameDecoderEchoClient {

    public void connect(int port, String host) {
        try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler());
                        }
                    });
            //异步链接操作
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            //等待客户端
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1");
    }
}

2

FixedLengthFrameDecoder客户端处理类

代码语言:javascript
复制
public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
    private AtomicInteger count = new AtomicInteger(0);
    private byte[] req;

    public FixedLengthFrameDecoderEchoClientHandler() {
        req = ("hello world").getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }

    }

    /**
     * 读取并打印消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

3

FixedLengthFrameDecoder服务器端

代码语言:javascript
复制
public class FixedLengthFrameDecoderEchoServer {

    public void bind(int port) {
        //配置服务器端NIO线程组
        //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
        try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){
            //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
            ServerBootstrap bootstrap = new ServerBootstrap();
            //功能类似于NIO中的ServerSocketChannel
            bootstrap.group(bossLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //配置NioServerSocketChannel的参数
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //绑定事件的处理类ChildChannelHandler
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //FixedLengthFrameDecoder解码器
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11));
                            //StringDecoder解码器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler());
                        }
                    });
            //绑定端口,同步等待绑定操作完成
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            //等待服务器监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new FixedLengthFrameDecoderEchoServer().bind(port);
    }
}

4

FixedLengthFrameDecoder服务器端处理类

代码语言:javascript
复制
public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        String body = (String) msg;
        System.out.println("服务器端收到消息:" + body);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //将消息发送队列中的消息写入SocketChannel中,发送到对方
        //防止频繁的唤醒Selector进行消息发送
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        //发生异常关闭ChannelHandlerContext等资源
        ctx.close();
    }
}

5

FixedLengthFrameDecoder测试结果

代码语言:javascript
复制
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
······

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 落叶飞翔的蜗牛 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档