前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 主从多线程

Netty 主从多线程

作者头像
FoamValue
修改2020-08-31 17:31:55
1.3K0
修改2020-08-31 17:31:55
举报
文章被收录于专栏:FoamValue

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. https://netty.io

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 是一个 NIO 客户端服务器框架,它支持快速轻松地开发网络应用程序,如协议服务器和客户端。它极大地简化了网络编程,如 TCP 和 UDP 套接字服务器。

Nonblocking I/O

NIO,非阻塞 IO。对比于BIO(Blocking I/O,阻塞IO),NIO 的并发性能得到了很大提高。

常见的五种 IO 模型对比

  • 同步阻塞 IO(BIO)阻塞整个步骤。适用于少连接且延迟低的场景。
  • 同步非阻塞 IO(NIO),阻塞业务处理但不阻塞数据接收。适用于高并发且处理简单的场景。
  • 多路复用 IO,数据请求和业务处理是两个分开进行处理。
  • 信号驱动 IO,主要用在嵌入式开发,不参与讨论。
  • 异步 IO,数据请求和业务处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

主从多线程

Netty 是典型的 Reator 模型结构。

Reactor 模式是基于事件驱动开发的,其核心组成部分包括 Reactor 和线程池。其中 Reactor 负责监听和分配事件,而线程池负责处理事件。

根据Reactor的数量和线程池的数量,又将Reactor分为三种模型:

  • 单线程模型 (单 Reactor 单线程)
  • 多线程模型 (单 Reactor 多线程)
  • 主从多线程模型 (多 Reactor 多线程)

什么是主从多线程

从一个主线程 NIO 线程池中选择一个线程(boss)作为 Acceptor 线程,绑定监听端口,接收客户端连接的连接,其他线程(worker)负责后续的业务处理工作。


示例代码

从开源项目中截取了一段 Netty 初始化代码片段。

代码语言:javascript
复制
private void start() {
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
        // 参数的作用就是控制是否启用 Nagle 算法。
        .childOption(ChannelOption.TCP_NODELAY, true)
        .handler(new LoggingHandler(LogLevel.INFO))
        // 当客户端第一次进行请求的时候才会进行初始化
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) {
            // 30 秒之内没有收到客户端请求的话就关闭连接
            ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
          }
        })
        // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY
        // 参数的作用就是控制是否启用 Nagle 算法。
        .childOption(ChannelOption.TCP_NODELAY, true)
        // 是否开启 TCP 底层心跳机制
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        // 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
        .option(ChannelOption.SO_BACKLOG, 128);
    // 绑定端口,同步等待绑定成功
    ChannelFuture f = b.bind(host, port).sync();
    ChannelFuture f2 = b.bind(host, port2).sync();
    // 等待服务端监听端口关闭
    f.channel().closeFuture().sync();
  } catch (InterruptedException e) {
    System.out.println(String.format("occur exception when start server:", e));
  } finally {
    System.out.println(String.format("shutdown bossGroup and workerGroup"));
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
  }
}

b.bind(host, port).sync() 和 b.bind(host, port2).sync() 同时绑定了两个 ip 和 端口。

运行结果


源代码知识点

NioEventLoopGroup

特殊的 EventExecutorGroup 接口类,它允许注册已处理的通道,以便在事件循环期间进行后续选择。

ServerBootstrap

Bootstrap 子类,可轻松引导 ServerChannel。

NioServerSocketChannel

一个 ServerSocketChannel 接口的实现类,它使用基于NIO选择器的实现来接受新连接。

ChannelFuture

异步 Channel I / O操作的结果,未完成或已完成。


代码调试

new NioEventLoopGroup()

MultithreadEventExecutorGroup.java 初始化实例。

代码语言:javascript
复制
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

传入参数:

  1. nThreads 此实例将使用的线程数。
  2. executor 需要执行的 Runable 任务对象 。
  3. choicerFactory 创建 EventExecutorChooser 对象的工厂类。
  4. args 参数将传递给每个 newChild 调用。

new ServerBootstrap()

ServerBootstrap 是 Netty 服务端应用开发的入口。

ServerBootstrap 的配置:

  • group 方法,设置初始化的主从"线程池"。
  • channel 方法,设置通道类型。服务端:NioServerSocketChannel。
  • ...

b.bind(host, port).sync()

代码语言:javascript
复制
绑定并侦听某个端口
代码语言:javascript
复制
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

小结

突如其来的三天小长假,彻底打乱了生活节奏 。

一天搬家、一天休息、一天加班。眼见着明天周日应该好好学习知识了,迎来的却是正常班。

周六熬夜写文章,然后明天早起上班去。就这样。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FoamValue 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源代码知识点
    • NioEventLoopGroup
      • ServerBootstrap
        • ChannelFuture
        • 代码调试
          • new ServerBootstrap()
            • b.bind(host, port).sync()
            相关产品与服务
            云服务器
            云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档