需求
NettyServer
package com.dance.netty.netty.inandout;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加自己的处理器
// 入站的Handler进行解码 MyByteToLongDecoder
pipeline.addLast("longDecoder", new MyByteToLongDecoder());
// 对long类型进行编码的编码器
pipeline.addLast("longEncoder", new MyLongToByteEncoder());
// 处理入站数据
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync();
System.out.println("server is ready ......");
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
package com.dance.netty.netty.inandout;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class NettyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器收到来自客户端的消息 : " + msg.toString());
// 回显客户端
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
NettyClient
package com.dance.netty.netty.inandout;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 解析Long类型的解析器
pipeline.addLast("longDecoder", new MyByteToLongDecoder());
// 对long类型进行编码的编码器
pipeline.addLast("longEncoder", new MyLongToByteEncoder());
// 自己的数据处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
NettyClientHandler
package com.dance.netty.netty.inandout;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class NettyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送消息到服务器
// 正常数据
ctx.writeAndFlush(1234567890L);
/*
* 分析:
* 1. "qwerqwerqwerqwer" 是16个字节
* 2. 该处理器的前一个Handler是MyLongToByteEncoder
* 3. MyLongToByteEncoder 的父类是 MessageToByteEncoder
* 4. MessageToByteEncoder 类中 有一个模板方法, 里面调用了我们自己实现的encode方法, 因为是模板方法所以里面有逻辑(重要),分析一下
* @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
* // 重点: 判断 消息是否和当前的编码器匹配 如果匹配才往下走
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
* // 转换消息类型为定义类型
I cast = (I) msg;
* // 构造成 ByteBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
* // 重点: 调用我们自己实现的encode方法 传入case, 也就是我们发送的数据
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
* // 重点: 不匹配直接发送, 不走encode方法
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
* 5. 因此我们在编写Encoder的时候, 要注意 传入的数据类型要和Encode的处理类型一致
*/
// 其他类型数据
// ctx.writeAndFlush(Unpooled.copiedBuffer("qwerqwerqwerqwer", StandardCharsets.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("客户端接收来自服务器的消息: " + msg.toString());
}
}
MyByteToLongDecoder
package com.dance.netty.netty.inandout;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 自定义解码器
*/
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* 解码方法 会根据接收到的数据被调用多次 直到确定没有新的元素被添加到 List中
* 或者ByteBuf没有更多的可读字节为止
* 如果List out 不为空 就会将list的内容传递给下一个ChannelInboundHandler进行处理 该方法也会被调用多次
* @param ctx 上下文对象
* @param in 入站的ByteBuf
* @param out 输出给下一个调用链的数据容器
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 应为这个是一个Long的解码器, 所以只处理Long类型的数据
// 默认Long是8个字节
if(in.readableBytes() >= 8){
out.add(in.readLong());
}else{
// TODO 不够一个Long的数据, 不处理
}
}
}
MyLongToByteEncoder
package com.dance.netty.netty.inandout;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.util.List;
/**
* 自定义编码器
*/
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
/**
* 编码方法
* @param ctx 上下文
* @param msg 消息
* @param out 输出给下一个编码器的ByteBuf
* @throws Exception 异常
*/
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
if(msg != null){
out.writeLong(msg);
}
}
}
Server
一月 18, 2022 12:05:45 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x1a8323fa] REGISTERED
一月 18, 2022 12:05:45 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x1a8323fa] BIND: /127.0.0.1:7000
一月 18, 2022 12:05:45 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x1a8323fa, L:/127.0.0.1:7000] ACTIVE
server is ready ......
一月 18, 2022 12:05:50 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x1a8323fa, L:/127.0.0.1:7000] READ: [id: 0xac268da1, L:/127.0.0.1:7000 - R:/127.0.0.1:55730]
一月 18, 2022 12:05:50 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x1a8323fa, L:/127.0.0.1:7000] READ COMPLETE
服务器收到来自客户端的消息 : 1234567890
Client
客户端接收来自服务器的消息: 1234567890
package com.dance.netty.netty.inandout;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* 自定义解码器
*/
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 应为这个是一个Long的解码器, 所以只处理Long类型的数据
// 默认Long是8个字节
// if(in.readableBytes() >= 8){
// 不需要判断数据是否够读取 ,内部会进行处理判断
out.add(in.readLong());
// }else{
// // TODO 不够一个Long的数据, 不处理
// }
}
}