Protobuf编写客户端Netty客户端客户端处理类Protobuf编写服务端netty服务端netty服务端处理类
上一篇文章讲了如何使用Protobuf以及如何将Protobuf的文件编译成Java文件,本篇将介绍如何使用Protobuf集成到netty客户端中。
其中几个关键类的作用:
public class EchoClient {
public void connect(String host, int port, int sendNumber) throws Exception {
// 配置客户端NIO线程组
try(EventLoopGroup group = new NioEventLoopGroup()) {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// 设置TCP连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//用于decode前解决半包和粘包问题
// (利用包头中的包含数组长度来识别半包粘包)
ch.pipeline()
.addLast(new ProtobufVarint32FrameDecoder());
//反序列化指定的Probuf字节数组为protobuf类型
ch.pipeline().addLast(new ProtobufDecoder(
Message.Person.getDefaultInstance()));
//用于在序列化的字节数组前加上一个简单的包头
// 只包含序列化的字节长度
ch.pipeline()
.addLast(new ProtobufVarint32LengthFieldPrepender());
//用于对Probuf类型序列化
ch.pipeline().addLast(new ProtobufEncoder());
// 添加业务处理handler
ch.pipeline().addLast(new EchoClientHandler(sendNumber));
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// 采用默认值
}
}
int sendNumber = 100;
new EchoClient()
.connect("localhost", port, sendNumber);
}
}
public class EchoClientHandler extends ChannelHandlerAdapter {
// sendNumber为写入发送缓冲区的对象数量
private int sendNumber;
public EchoClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
}
/**
* 构建长度为userNum的User对象数组
* @param userNum
* @return
*/
private List<Message.Person> getPersonList(int userNum) {
List<Message.Person> personList = new ArrayList<>();
for(int i = 0; i < userNum; i++) {
Message.Person.Builder personBuilder = Message.Person.newBuilder();
personBuilder.setId(i);
personBuilder.setName("Admin" + i);
personBuilder.addPhone(
Message.Person.Phone.newBuilder()
.setNumber("10010")
.setType(Message.Person.PhoneType.MOBILE));
personBuilder.addPhone(
Message.Person.Phone.newBuilder()
.setNumber("10086")
.setType(Message.Person.PhoneType.HOME));
personBuilder.addPhone(Message.Person.Phone.newBuilder()
.setNumber("10000")
.setType(Message.Person.PhoneType.WORK));
Message.Person person = personBuilder.build();
personList.add(person);
}
return personList;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
List<Message.Person> personList = getPersonList(sendNumber);
for (Message.Person person : personList) {
ctx.writeAndFlush(person);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Client receive the msgpack message : " + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
public class EchoServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
try(EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup()) {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//用于decode前解决半包和粘包问题
// (利用包头中的包含数组长度来识别半包粘包)
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
//反序列化指定的Probuf字节数组为protobuf类型
ch.pipeline().addLast(
new ProtobufDecoder(
Message.Person.getDefaultInstance()));
//用于在序列化的字节数组前加上一个简单的包头
// 只包含序列化的字节长度
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
//用于对Probuf类型序列化
ch.pipeline().addLast(new ProtobufEncoder());
// 添加业务处理handler
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// TODO: handle exception
}
}
new EchoServer().bind(port);
}
}
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(
"Server receive the msgpack message : "
+ msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause) {
// 发生异常,关闭链路
ctx.close();
}
}