前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >netty 入门

netty 入门

作者头像
_淡定_
发布于 2020-05-12 06:46:31
发布于 2020-05-12 06:46:31
77800
代码可运行
举报
文章被收录于专栏:dotnet & javadotnet & java
运行总次数:0
代码可运行

文档 使用手册 近期公司通过TCP连接的的方式接了一个硬件设备,用了最基础的ServerSocket类,参考的oracle的文档 。 实现的比较简单,放在github 上,不过这里应该用Netty才是正解。所以,过一下Netty的入门文档。 本文demo

序言

问题

我们一般会用Http客户端库来调用web服务,获取数据。如果一个东西是出于一般性目的设计出来的,那么他在某些方面可能就不是最合适的。比如获取大文件,收发邮件,展示实时的金融数据,游戏数据传输等。为了实现这些需求,需要一个为其高度优化的特定协议。还有一个无法避免的问题是你可能需要调用老系统的数据,但是他的协议又是特定。重点来了,如何在不牺牲可靠性性能的前提下快速实现这么一个系统。

解决方案

用Netty。用Netty。用Netty。重要的事情说3遍。

Netty是一个异步 事件驱动 网络框架 ,可以用来快速开发易维护,高性能,可扩展的服务端/客户端。换句话说他简化了TCPUDP 等服务的网络开发。

容易开发或者快速开发并不意味着他会牺牲可维护性或者是面临性能问题。Netty吸取了大量用于实现FTP,SMTP,HTTP协议的经验,并且仔细小心谨慎的设计。所以,他在易于开发,追求性能,确保稳定性和灵活性上并没有对任何一点有所妥协。

有人可能会说别的框架他们也这么说自己,那Netty到底或者为什么和他们不一样。答案是他的设计理念。Netty提供的API用起来就非常舒服。现在可能不是那么直观,但是当你使用的时候就会体会到。

开始使用

这节会围绕Netty的核心构建过程,用几个例子来让你快速上手。学完这节你会可以在Netty框架的基础上学会写client和server。

如果你想学的深入一点,了解一下他的底层实现,第二节,架构概览是个不错的起点。

开始之前

这节需要两个东西,新版的Netty和jdk1.6+。Netty下载地址

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependencies>
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.49.Final</version>
    </dependency>
</dependencies>

随着你不断往下看,你能会对这节引入的类有疑惑,你可以随时通过API文档来了解更多。类名都是带链接的,可以直接点过去。

编写一个Discard Server

前半部分

世界上最简单的协议并不是输出Hello world,而是Discard,就是过来什么都直接丢弃,并且不给任何回复。下面让我们直接从Netty提供的handler实现来处理IO事件。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {//1
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//2
//        super.channelRead(ctx, msg);
        ((ByteBuf) msg).release();//3
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//4
//        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
}

有以下几点:

  1. 我们写一个DiscardServerHandler类继承自ChannelInboundHandlerAdapter,这个ChannelInboundHandlerAdapter继承自抽象类ChannelHandlerAdapter并且实现了接口ChannelInboundHandlerChannelInboundHandler提供了各种各样的可重写的事件handler方法。这里只要使用ChannelInboundHandlerAdapterChannelInboundHandler的默认实现就好,不需要自己去实现所有的ChannelInboundHandler方法。
  2. channelRead方法我们重写掉了,这个方法会在收到客户端消息的时候调用。例子中,消息msg的类型为ByteBufByteBuf是对byte[]的一种抽象,可以让我们访问数组内容。
  3. 我们这里需要实现的是Discard协议,就是丢弃协议,所以需要忽略收到的所有消息。ByteBuf是一种reference-counted的对象(可以简单理解指针之类的东西),必须通过显式调用其release方法来释放。通常,我们的channelRead方法是下面这样的 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try{ //对msg做一些处理 }finally { ReferenceCountUtil.release(msg); } }
  4. Netty在处理IO的遇到exception就会进入exceptionCaught方法。通常,需要做一下日志记录,然后把相关的channel(通道)关闭。这里做法也不是固定的,你可以先发一个带code的Response然后再关闭。

后半部分

到这一步,我们已经实现了Discard服务的前半部分,剩下的就是写一个main方法然后来启动这个DiscardServerHandler服务。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.discard;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class DiscardServer {
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();//1
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();//2
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)//3
                    .childHandler(new ChannelInitializer<SocketChannel>() {//4
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)//5 最大连接数128
                    .childOption(ChannelOption.SO_KEEPALIVE, true);//6

            //绑定端口启动服务
            ChannelFuture f = b.bind(port).sync();//7
            //server关闭的时候调用。因为这里是Discard 服务,所以永远不会调用。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if (args.length > 0)
            port = Integer.parseInt(args[0]);

        new DiscardServer(port).run();
    }
}

有以下几点:

  1. NioEventLoopGroup是一个多线程的event loop(事件环?)。Netty针对不同的情况提供了多(18)种EventLoopGroup的实现,因为这里是一个服务端应用,所以使用NioEventLoopGroup。new出来两个对象,通常第一个叫boss,接收进来的连接。第二个,通常叫worker,因为当boss接收了连接之后会把链接注册给worker,让worker来处理后面的通信。每个EventLoopGroup使用线程数以及他们如何被映射到ChannelEventLoopGroup的实现决定,并且可能可以通过构造函数来指定。
    1. 下面是NioEventGroup的部分构造函数。
    2. 什么是Netty的Channel。按照文档的介绍,可以简单理解为socket的一个抽象,或者是IO的操作,包括IO的读写,连接,绑定等。Channel会给使用者提供以下功能:
      • 当前的状态(连接是否已经打开或者连上)
      • channel的配置参数。(接收的缓冲区大小)
      • socket,io相关的操作(读写等)
      • 处理io事件的管道
      • 详细的以后再说
  2. ServerBootstarp是一个配置server的帮助类,你可以使用Channel自己来配置,但是会比较枯燥,所以,大多数情况下直接使用这个ServerBootstrap就好。
  3. NioServerSocketChannel是一个Channel的实例,用来处理进来的连接(上面说的channel的功能)。
  4. ChannelInitializer是一个特殊的Handler,作用是帮助用户配置Channel。通常的作用是把ChannelHandler放到ChannelPipeline(管道)里面,请求会进入到Pipeline,处理就按照这个Pipeline配置的Handler来。DiscardServerHandler就是一种Handler。
  5. 用来配置Channel的参数。顺道看一下ServerBootstrap的定义,这个ServerBootstrap是用来启动ServerChannelServerChannel实际上就是一个Channel。我们这里实现的是一个TCP/IP server,所以,可以设置tcpNoDelaykeepAlive等参数。具体设置看文档。 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>{}
  6. option的为接收连接的配置,也就是给boss用,后面的childOption为worker配置选项。
  7. 万事俱备,只欠把绑定端口配置上去然后启动服务。main方法里面。

恭喜,搞定。用个tcp 客户端连接试试~~可以看到连接成功,发送了3字节,然后因为是Discard,所以没有返回。

收到的数据

让我们稍微修改一下代码,以便看看我们收到的数据。按照之前的例子,需要再channelRead方法里面做修改。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    final ByteBuf in = (ByteBuf) msg;
    try {
        System.out.println(in.toString(CharsetUtil.US_ASCII));
    } finally {
        in.release();
    }
}

msg可以直接转换成ByteBuf对象,然后用ByteBuf的toString方法,设置ASCII参数装成string打印出来。

运行起来然后可以直接在浏览器输入localhost:8080访问,就能看到传过来的数据。

写一个Echo Server

我们写一个Echo服务,客户端输入什么,我们就回复什么。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);//1
        ctx.flush();//2
    }
}
  1. 通过ChannelHandlerContext对象,我们可以触发一些IO事件或者执行一些操作。这里我们不需要手动release msg,因为当我们执行了wirte方法,Netty会帮我们释放。
  2. ctx.write(Object)会把内容写到缓冲区,在调用flush后再输出出去。可以用writeAndFlush方法代替。

测试一下,发送3个字节,收到3个字节的回复。

写一个TIME Server

这个例子用来实现一个Time协议。通过实现这个协议,我们可以了解Netty如何构造发送数据。根据RFC868协议,Time协议有这么几步

  1. 服务器监听37端口
  2. 客户端连接
  3. 服务端返回一个4字节的int时间数据
  4. 客户端接收到这个数据
  5. 客户端关闭连接
  6. 服务端关闭连接。

这里服务端忽略收到的任何客户端数据,而是当客户端一建立连接就返回数据,所以这里不使用channelRead方法,而是channelActive方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {//1
        final ByteBuf timeBuf = ctx.alloc().buffer(4);//2
        timeBuf.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf);//3
        channelFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                assert channelFuture == future;
                ctx.close();
            }
        });//4

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 重写的是channelActive方法,这个方法会在连接进来的时候调用。
  2. 因为要返回一个int值,所以需要4个字节,通过ChannelHandlerContext分配,然后writeAndFlush方法写入并发送。
  3. 把数据发送给非阻塞IO流的时候不需要调用java.nio.ByteBuffer.flip()方法,Netty的ByteBuf没有提供这个方法,因为他不需要。ByteBuf内部有两个指针,一个用于读,一个用于写。write的时候读指针移动,写指针不动,反之同理。在使用ByteBuffer的时候如果没有flip,数据就会乱。 Netty里面所有的IO操作都是异步的,这样可能会导致write没有开始(或者没有完成)之前就连接就close掉了。比如下面的代码: Channel ch = ...; ch.writeAndFlush(message); ch.close();//这也不是立马关闭,也是一个ChannelFuture对象 write(writeAndFlush)返回的是一个ChannelFuture对象,来大致看下这个对象的解释。

继承自Future,表示一个Channel的IO操作的结果,不过他还没完成,只是表示已经创建。【详细的以后再讲。】

  1. 如何能知道这个IO操作的结果呢?我们可以给这个ChannelFuture增加一个ChannelFutureListener的实例(接口),然后实现它的operationComplete方法。这里面的方法比较简单,就是close掉这个ChannelHandlerContext,所以,可以使用定义好的ChannelFutureListener.CLOSE方法。像下面这样 channelFuture.addListener(ChannelFutureListener.CLOSE);
  2. 用rdate 测试一下。测试通过。

写一个TIME Client

写完server之后就要写client了。client程序和server程序最大的不同在于选择的BootstrapChannel的实现类的差异。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws InterruptedException {
        int port = 37;
        String host = "192.168.1.181";
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            final Bootstrap bootstrap = new Bootstrap();//1
            bootstrap.group(workerGroup);//2
            bootstrap.channel(NioSocketChannel.class);//3
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);//4
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());//6
                }
            });

            final ChannelFuture connectFuture = bootstrap.connect(host, port).sync();//5

            connectFuture.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap和前面的ServerBootstrap类似,不过这个不是给non-server非服务器用,而是给客户端或者connectionless非连接的用。
  2. 客户端就不需要boss EventLoopGroup了。其实前面的Server中group能用同一个。 serverBootstrap.group(workGroup, workGroup);//同一个group
  3. channel也需要换成NioSocketChannel,而不是NioServerSocketChannel
  4. 这里直接用option方法,而不是childOption和option,因为对应client,没有childOption的概念。
  5. client需要去connect,而不是bind来监听。

看一下TimeClientHander.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        final ByteBuf m = (ByteBuf) msg;//1
        try {
            final long currentTimeMills = (m.readUnsignedInt() - 2208988800L) * 1000L;//2
            System.out.println(new Date(currentTimeMills));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 对象转换成ByteBuf
  2. 直接通过readUnsignedInt()方法读取数值。

代码看起来比较简单,但是一定可能性(小概率)会有IndexOutOfBoundsException异常,我们下节讨论。

处理基于stream的传输

Socket Buffer的一个问题

TCP/IP是一个典型的stream-based协议,接收数据然后放到socket buffer里面。但是这个buffer队列存的是byte,而不是packet数据包。所以,就算发了两个packet,在系统看来,他就是一堆byte。所以,没有办法保证你读取到的东西和发过来的一定一样。

举个例子,假设收到了3个数据包,ABC,DEF,GHI

有可能收到的是下面这样的

所以,server和client需要一种规则来划分数据包,然后对方就知道每个包到底是啥样的。

第一种解决方案

其实道理上来说因为int数据包也就4个字节,所以不太会被分片,不太容易出现IndexOutOfBoundsException异常。但是,随着数据包变大,分片的可能性就会增加,到时候异常出现的概率就会增大。

因为我们知道收到的数据是4个字节,所以,我们可以分配一个4自己的空间,等到一满,我们就知道已经收到该有的数据包了,就直接处理就好。来改一下我们的TimeClientHandler

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(4);//1
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();//1
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        final ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m);//2
        m.release();

        if (buf.readableBytes() >= 4) {//3
            final long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 重写了handlerAddedhandlerRemoved方法,在这两个方法里面初始化或者销毁buf对象。只要这两个方法不会阻塞太长时间,是没有关系的。
  2. 把收到的内容写到buf对象里面。
  3. 每次有数据过来的时候会进入channelRead方法(不同的连接不会串),做一个业务逻辑判断。

第二种解决方案

虽然上面的问题是解决了,但是因为我们晓得发过来的数据是4个字节的(就一个字段),所以比较好处理。但是,如果这个对象是一个比较复杂的业务对象,那么要维护这个类就会比较麻烦。

我们可以对这个TimeClientHandler2的功能拆解成2部分。

  1. TimeDecoder专门处理数据包分片的问题。
  2. TimeClientHandler2还是保持简单。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder  extends ByteToMessageDecoder {//1
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//2
        if (in.readableBytes()<4)
            return;//3
        out.add(in.readBytes(4));//4
    }
}
  1. ByteToMessageDecoder继承自ChannelInboundHandlerAdapter,实现了ChannelInboundHandler接口,所以这个Decoder对象也是一个ChannelHandler对象。他专门用来处理分片问题。
  2. ByteToMessageDecoder会在有新的数据进来的时候调用decode方法,内部维护一个buffer。
  3. ByteToMessageDecoder可以根据自己的业务逻辑来执行。
  4. 假设进来的字节数据大于4,那么他就会调用这个decode多次,每次处理4个字节。

使用POJO而不是ByteBuf

前面的例子读写数据的核心都是ByteBuf类,在ChannelHandler里面直接把object msg 转成ByteBuf,然后操作。如果我们能通过POJO来操作,那么,代码的可维护性明显会高一些。让我们来改造一下我们的代码。

第一步,定义一个UnixTime类,来表示我们要处理的对象。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time2;

import java.util.Date;

public class UnixTime {
    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "转换出来的时间是:"+ new Date((getValue() - 2208988800L) * 1000L).toString();
    }
}

第二步,我们改一下我们的TimeDecoder来产生一个UnixTime对象。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time2.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class TimeDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes()<4)
            return;
        out.add(new UnixTime(in.readUnsignedInt()));
    }
}

第三步,在ChannelHandler里面我们直接按照UnixTime对象操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time2.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime time = (UnixTime) msg;
        System.out.println(time);
        ctx.close();
    }
}

第四步,同理,server端也可以类似的修改。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time2.server;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelFuture channelFuture = ctx.writeAndFlush(new UnixTime());
        channelFuture.addListener(ChannelFutureListener.CLOSE);
    }
}

相较于以前的分配空间的操作,明显简单了许多。

第五步,现在,还缺一个东西,一个encoder,用来把UnixTime转成ByteBuf,这个是逃不开的,哈哈。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time2.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        UnixTime m = (UnixTime) msg;
        ByteBuf buffer = ctx.alloc().buffer(4);
        buffer.writeInt((int)m.getValue());
        ctx.write(buffer,promise);//1
    }
}
  1. 这一行里面有一些比较重要的事情。
    1. 这里有个ChannelPromise对象,来标记write成功与否。
    2. 我们没有手动调用flush方法,因为ChannelOutboundHandlerAdapter有个flush会自动调用。

其实这个用MessageToByteEncoder泛型还能更简单一点

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package io.netty.example.time2.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder2 extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception {
        out.writeInt((int) msg.getValue());
    }
}

MessageToByteEncoder是一个ChannelOutboundHandlerAdapter的实现抽象类,专门负责把POJO对象转成ByteBuf。

最后一步,把Encoder像之前Decoder一样加到ChannelPipeline里面。你懂的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
client.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());//客户端解码 加进去
    }
});
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
server.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new TimeEncoder2(), new TimeServerHandler());//服务端直接操作UnixTime,需要编码,加进去
            }
        })
        .option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);

代码结构看起来是这样的

关闭程序

关闭比较简单,调用shutdownGracefully()即可,然后会返回一个Future对象。

小结

强烈建议看看官方的例子

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-05-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
【Java】Netty创建网络服务端客户端(TCP/UDP)
Netty是一个基于Java的异步事件驱动的网络应用程序框架,专门用于快速开发高性能、可扩展和可维护的网络服务器和客户端。它提供了简单而强大的API,使开发人员能够轻松地构建各种网络应用,包括TCP、UDP、HTTP、WebSocket等。
DevFrank
2024/07/24
8730
【Java】Netty创建网络服务端客户端(TCP/UDP)
分布式专题|都说netty入门很难,那是因为你没有看我的文章!
在写代码之前,我们先看下netty的线程模型,这比那固定格式的代码将会更有趣,看完线程模型,你就知道netty写的那几段固定代码的意义了。
AI码师
2020/12/26
1.6K0
分布式专题|都说netty入门很难,那是因为你没有看我的文章!
Netty 私有协议粘包拆包实例
接下来,采用 Java + Netty 模拟该组件的功能,以演示私有协议下 netty 的粘包/拆包的实现。
鲍远林
2022/03/26
9600
09-Netty 高性能架构设计-基于Netty开发TCP服务
Netty快速入门实例-TCP服务 需求 使用IDEA创建Netty项目 Netty服务器在6668端口监听, 客户端能发送消息给服务器"Hello, 服务器~" 服务器可以回复消息给客户端"hello, 客户端~" 目的: 对Netty线程模型 有一个初步认识, 便于理解Netty 模型理论 编写服务端 编写客户端 对Netty程序进行分析, 看看Netty模型特点 添加Netty依赖 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -
彼岸舞
2022/02/18
5040
Netty In Action中国版 – 第二章:第一Netty程序
本章将简介Netty的核心概念,这个狠心概念就是学习Netty是怎样拦截和处理异常。对于刚開始学习netty的读者。利用netty的异常拦截机制来调试程序问题非常有帮助。本章还会介绍其它一些核心概念。如server和client的启动以及分离通道的处理程序。本章学习一些基础以便后面章节的深入学习。
全栈程序员站长
2022/07/06
4100
Netty In Action中国版 – 第二章:第一Netty程序
Netty(三)之数据之粘包拆包
客户端发送10个长度的字符串,因为设置了长度为5的定长解码器,所以服务器收到2条消息
CBeann
2023/12/25
1980
Netty(三)之数据之粘包拆包
Netty(一)之helloworld
客户端通向服务器端发送消息,服务器端读取数据(你好)并且返回(new Date()),客户端读取数据
CBeann
2023/12/25
1540
java架构之路-(netty专题)netty的基本使用和netty聊天室
  上次博客,我们主要说了我们的IO模型,BIO同步阻塞,NIO同步非阻塞,AIO基于NIO二次封装的异步非阻塞,最重要的就是我们的NIO,脑海中应该有NIO的模型图。
小菜的不能再菜
2020/02/23
1.3K0
Netty之入门案例
  前面给大家介绍了NIO,我们会发现用NIO实现异步非阻塞的网络通信代码量非常大,而且并不是很好理解,在实际的开发中一般我们也都是会实现基于NIO的框架来操作的,比如Netty,这样开发效率有高而且Bug也少。
用户4919348
2019/04/18
5250
Netty之入门案例
10分钟快速入门Netty 比写NIO爽百倍
为了解决NIO 编码复杂,但是又想使用NIO,所以netty来了,netty 通过对nio复杂的api进行了封装,使得netty在具备高性能、高吞吐量、低延迟的前提下,还能方便开发人员进行快速开发。
AI码师
2022/12/22
2890
Netty入门(Netty4.x使用指南)
现如今,我们使用通用的应用程序或库来相互通信。例如,我们经常使用HTTP客户端库从服务器上获取信息并通过web服务执行远程过程调用。但是,通用协议或它的实现有时并不能很好的伸缩。这就像我们不会使用通用HTTP服务器来交换大文件、电子邮件、还有像金融信息、游戏数据等实时信息。这些业务所需要的是高度优化实现协议,用于专门的目的。例如,您可能希望实现一个针对基于ajax的聊天应用程序、媒体流应用、大文件传输进行优化的http服务器。您甚至可能想要设计并实现一个完全符合您的需求的新协议。另一个不可避免的情况是,你不得不去处理一个遗留的专有协议,来保证和旧系统的互操作性。在这些情况下,重要的是在不牺牲最终应用程序的稳定性和性能的前提前,如何尽可能快的实现该协议。
小诸葛
2020/04/14
1.1K0
Netty的TCP粘包/拆包(源码二)
假设客户端分别发送了两个数据包D1和D2给服务器,由于服务器端一次读取到的字节数是不确定的,所以可能发生四种情况:
用户3003813
2018/09/06
9040
Netty的TCP粘包/拆包(源码二)
Netty Review - 探索Pipeline的Inbound和Outbound
我们知道当boss线程监控到绑定端口上有accept事件,此时会为该socket连接实例化Pipeline,并将InboundHandler和OutboundHandler按序加载到Pipeline中,然后将该socket连接(也就是Channel对象)挂载到selector上。
小小工匠
2023/12/01
3140
Netty Review - 探索Pipeline的Inbound和Outbound
Netty第一个入门实例-TCP服务
好了netty的第一个入门案例就到此了,具体的代码解释请多看代码中的注释,有不清楚的欢迎留言交流 O(∩_∩)O哈哈~
用户4919348
2019/12/31
5K0
Netty Review - 核心组件扫盲
如果Handler处理器有一些长时间的业务处理,可以交给taskQueue异步处理。
小小工匠
2023/11/15
5110
Netty Review - 核心组件扫盲
Netty | 属于你的第一款Netty应用程序
我在这里偷懒了,直接是导入了netty-all,如果不想的话,大家可以用到什么导入什么,因为Netty支持的特别广,所以有不同的Jar包。
宁在春
2022/10/31
3200
Netty | 属于你的第一款Netty应用程序
Netty(1):第一个netty程序
为什么选择Netty   netty是业界最流行的NIO框架之一,它的健壮型,功能,性能,可定制性和可扩展性都是首屈一指的,Hadoop的RPC框架Avro就使用了netty作为底层的通信框架,此外netty在互联网,大数据,网络游戏,企业应用,电信软件等众多行业都得到了成功的商业应用。正因为以上的一些特性,使得netty已经成为java NIO编程的首选框架。 构建netty开发环境 其实使用netty很简单,直接将其jar包引入到工程中即可使用。 去 http://netty.io/网站上下载最新版本
SecondWorld
2018/03/14
1.2K0
Netty入门_合并财务报表应用指南
一个简单的NIO服务端程序,如果我们直接使用JDK的NIO类库进行开发,竟然需要经过繁琐的十多步操作才能完成最基本的消息读取和发送,这也是我们选择Netty框架的原因了。下面我们看看Netty是如何轻松搞定服务端开发的。
全栈程序员站长
2022/11/09
3550
Netty Review - Netty自动重连机制揭秘:原理与最佳实践
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
小小工匠
2023/12/25
1.4K0
Netty Review - Netty自动重连机制揭秘:原理与最佳实践
18-Netty 编解码器和Handler的调用机制
基本说明 Netty的组件设计: Netty的主要组件有Channel, EventLoop, ChannelFuture, ChannelHandler, ChannelPipeline等 ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器, 例如: 实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter), 你就可以接收入站事件和数据, 这些数据会被业务逻辑处理, 当要给客户端发送响应时, 也可以从ChannelInbound
彼岸舞
2022/02/18
1.2K0
18-Netty 编解码器和Handler的调用机制
相关推荐
【Java】Netty创建网络服务端客户端(TCP/UDP)
更多 >
LV.1
这个人很懒,什么都没有留下~
目录
  • 序言
    • 问题
  • 解决方案
  • 开始使用
    • 开始之前
    • 编写一个Discard Server
      • 前半部分
      • 后半部分
    • 收到的数据
    • 写一个Echo Server
    • 写一个TIME Server
    • 写一个TIME Client
    • 处理基于stream的传输
      • Socket Buffer的一个问题
      • 第一种解决方案
      • 第二种解决方案
    • 使用POJO而不是ByteBuf
  • 关闭程序
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档