package com.shi.netty.netty1.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author shiye
* @create 2020-06-10 10:24
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建bossGroup 和 workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) //设置俩个线程组
.channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 做为服务器通道的实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup 的 Eventloop 对应的管道设置处理器
System.out.println("........server is ready..........");
//启动服务器(并绑定端口)
ChannelFuture channelFuture = bootstrap.bind(7777).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}
}
package com.shi.netty.netty1.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* 自定义方法处理器
*
* @author shiye
* @create 2020-06-10 11:01
* 我们自定义一个handler需要集成 netty 规定好的 handlerAdapter
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据的方法
*
* @param ctx 上下文对象,其中包含管道pipeline,通道channel,地址
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("服务器启动的线程组为:" + Thread.currentThread().getName());
// System.out.println("server ctx = " + ctx);
// //将msg转成butbuf
// ByteBuf byteBuf = (ByteBuf) msg;
// System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
//***********如果这边有一个任务执行的时间非常长,不能让客户端进行等待***********//
//解决方案1:把需要长时间执行的任务放到NIOEventLoop的taskQueue中(如果有多个任务时间是叠加的)
// ctx.channel().eventLoop().execute(() -> {
// try {
// TimeUnit.SECONDS.sleep(10);
// ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端终于执行完了客户端的任务了 ...", CharsetUtil.UTF_8));
//
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// });
//解决方案2:用户自定义定时任务-》该任务提交到scheduletaskqueue中
ctx.channel().eventLoop().schedule(() -> {
try {
TimeUnit.SECONDS.sleep(10);
ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端终于执行完了客户端的任务了 ...", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 5, TimeUnit.SECONDS);
}
/**
* 读取数据完成
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//把数据写入缓存,并且刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client~ ...", CharsetUtil.UTF_8));
}
/**
* 捕捉异常信息
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("NettyServerHandler 捕捉到了异常..." + cause);
ctx.close();
}
}
package com.shi.netty.netty1.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* 客户端
*
* @author shiye
* @create 2020-06-10 11:25
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
int count = 500;
// List<Integer> list = new ArrayList<Integer>();
// for (int i = 0; i < count; i++) {
// list.add(i);
// }
//
// list.parallelStream().forEach((temp) -> {
// new Thread(() -> {
// NettyClient nettyClient = new NettyClient();
// try {
// nettyClient.pushClint();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// });
NettyClient nettyClient = new NettyClient();
nettyClient.pushClint();
}
public void pushClint() throws InterruptedException {
//客户端需要一个事件监听组
EventLoopGroup eventGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventGroup) //设置线程组
.channel(NioSocketChannel.class) //设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7777).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//优雅的关闭服务器
eventGroup.shutdownGracefully();
}
}
}
package com.shi.netty.netty1.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author shiye
* @create 2020-06-10 11:41
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪就会激活改方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("server hello, I,m client....", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件时,会触发
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端接收到的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("远程服务器端的地址:" + ctx.channel().remoteAddress());
}
}
2. 使用netty来接收http请求的简单案例
package com.shi.netty.netty1.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 测试netty接收http请求
*
* @author shiye
* @create 2020-06-15 11:06
*/
public class TestHttpServer {
public static void main(String[] args) {
//创建bossGroup 和 workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(6688).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.shi.netty.netty1.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* SimpleChannelInboundHandler 集成自 ChannelInboundHandlerAdapter
* HttpObject 是客户端和服务器端相互通讯的数据被封装成 HttpObject
*
* @author shiye
* @create 2020-06-15 11:08
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println(" pipeline:" + ctx.pipeline().hashCode() + " Handler:" + this.hashCode());
System.out.println("msg 类型是:" + msg.getClass());
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
if (msg instanceof HttpRequest) {
ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器端,收到了你的请求...", CharsetUtil.UTF_8);
//构造一个httpresponse对象并返回
HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//返回
ctx.writeAndFlush(response);
}
}
}
package com.shi.netty.netty1.http;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
* @author shiye
* @create 2020-06-15 11:08
*/
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向管道中假如处理器
//得到管道
ChannelPipeline pipeline = ch.pipeline();
//假如一个netty提供得HttpServerCodec 是HTTP编码解码器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//添加一个自定义得handler
pipeline.addLast(new TestHttpServerHandler());
}
}