Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【NIO】NIO版本鸿儒聊天室

【NIO】NIO版本鸿儒聊天室

作者头像
喜欢天文的pony站长
发布于 2020-07-16 13:17:23
发布于 2020-07-16 13:17:23
60000
代码可运行
举报
文章被收录于专栏:RabbitMQ实战RabbitMQ实战
运行总次数:0
代码可运行

# 需求

  • 基于NIO实现
  • 支持同时多个客户端接入
  • 支持客户端发送文本消息到服务器
  • 支持客户端自定义群聊名称
  • 接收到客户端发送的消息之后,服务器需要将消息转发给目前在线的所有其他客户端
  • 支持客户端退出群聊
  • 服务端停止服务后,客户端自动断开连接

# 技术介绍

  • Non-blockingI/O 编程模型
  • Channel 通道
    • ServerSocketChannel 服务端通道
    • SocketChannel 客户端通道
  • ByteBuffer NIO中使用的读写缓冲区
  • Selector 多路复用器
    • channel注册在多路复用器上,并监听相应的事件
  • 多线程
  • 线程池

# 代码

温馨提示:注意看代码注释哟~ 跟上节奏,很简单?

  • 服务器
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 基于NIO实现的聊天室服务端
 *
 * @author futao
 * @date 2020/7/8
 */
@Slf4j
public class NioChatServer {

    /**
     * 用于处理通道上的事件的线程池(可选的)
     */
    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);

    /**
     * 启动聊天室
     */
    public void start() {
        try {
            //服务端Socket通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //将通道设置成非阻塞
            serverSocketChannel.configureBlocking(false);
            //绑定主机与监听端口
            serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));

            //多路复用器
            Selector selector = Selector.open();

            //将服务端通道注册到多路复用器上,并设置监听事件接入事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            log.debug("{} 基于NIO的聊天室在[{}]端口启动成功 {}", StringUtils.repeat("=", 30), Constants.SERVER_PORT, StringUtils.repeat("=", 30));

            while (true) {
                // 触发了事件的通道数量,该方法会阻塞
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 获取到所有触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件进行处理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 处理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件记录
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 事件处理器
     *
     * @param selectionKey 触发的事件信息
     * @param selector     多路复用器
     */
    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
        if (selectionKey.isAcceptable()) {
            //如果触发的是SocketChannel接入事件
            try {
                // ServerSocketChannel上触发的客户端SocketChannel接入
                SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                log.debug("客户端[{}]成功接入聊天服务器", socketChannel.socket().getPort());
                // 将客户端SocketChannel通道设置成非阻塞
                socketChannel.configureBlocking(false);
                // 将客户端通道注册到多路复用器,并监听这个通道上发生的可读事件
                socketChannel.register(selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 触发的是可读事件
            // 获取到可读事件的客户端通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 读取通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                //切换为读模式
                byteBuffer.flip();
                // 接收到的消息
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                log.info("接收到来自客户端[{}]的数据:[{}]", socketChannel.socket().getPort(), message);
                // 是否退出
                quit(message, selector, selectionKey);
                // 消息转发
                forwardMessage(message, selector, selectionKey);
                // 清除缓冲区的数据
                byteBuffer.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 客户端退出
     *
     * @param message      消息
     * @param selector     多路复用器
     * @param selectionKey 触发的selectionKey
     */
    public void quit(String message, Selector selector, SelectionKey selectionKey) {
        if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
            int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
            // 客户端下线
            selectionKey.cancel();
            log.debug("客户端[{}]下线", port);
            // 因为发生了监听事件和channel的变更,所以需要通知selector重新整理selector所监听的事件
            selector.wakeup();
        }
    }

    /**
     * 转发消息
     *
     * @param message         需要转发的消息
     * @param selector        多路复用器
     * @param curSelectionKey 当前触发的selectionKey
     */
    public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
        // 创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
        // 数据写入缓冲区
        byteBuffer.put(message.getBytes(Constants.CHARSET));

        // 切换为读模式
        byteBuffer.flip();
        // 在首尾进行标记,因为需要给每个客户端发送同样的数据,需要重复读取
        byteBuffer.mark();
        // 当前注册在多路复用器上的SelectionKey集合
        Set<SelectionKey> keys = selector.keys();
        for (SelectionKey key : keys) {
            // 消息不能转发给自己 and 只转发给客户端SocketChannel
            if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
                continue;
            }
            // 客户端SocketChannel
            SocketChannel socketChannel = (SocketChannel) key.channel();
            // 如果缓冲区中还有数据就一直写
            while (byteBuffer.hasRemaining()) {
                try {
                    // 数据写入通道
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 重置到上次mark的地方,即首位
            byteBuffer.reset();
        }
        // 清除缓冲区的数据
        byteBuffer.clear();
    }


    public static void main(String[] args) {
        new NioChatServer().start();
    }
}
  • 客户端
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 基于NIO实现的群聊客户端
 *
 * @author futao
 * @date 2020/7/8
 */
@Getter
@Setter
@Slf4j
public class NioChatClient {

    /**
     * 用于处理用户输入数据的单个线程线程池,使用线程池是为了便于关闭
     */
    private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();

    /**
     * 用户名
     */
    private String userName;

    /**
     * 启动客户端
     */
    public void start() {
        try {
            // 创建客户端通道
            SocketChannel socketChannel = SocketChannel.open();
            // 将通道设置为非阻塞
            socketChannel.configureBlocking(false);

            // 创建多路复用器
            Selector selector = Selector.open();

            // 将客户端通道注册到多路复用器,并监听可读事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);

            // 尝试连接到聊天服务器
            socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));

            while (true) {
                // 阻塞等待通道上的事件触发。返回触发的通道的数量
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 获取到所有触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件进行处理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 处理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件记录
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e) {
            log.debug("成功退出聊天室...");
        }
    }

    /**
     * 处理器
     *
     * @param selectionKey 触发的selectionKey
     * @param selector     多路复用器
     */
    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {

        if (selectionKey.isConnectable()) {
            // 触发的是成功接入服务器的事件
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            try {
                // 判断此通道上的连接操作是否正在进行中
                if (socketChannel.isConnectionPending()) {
                    // 完成连接套接字通道的过程
                    socketChannel.finishConnect();
                    log.debug("成功接入聊天服务器");

                    // 将通道设置成非阻塞
                    socketChannel.configureBlocking(false);
                    // 将通道注册到多路复用器,并监听可读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);

                    // 创建缓冲区,用于处理将用户输入的数据写入通道
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
                    // 在新线程中处理用户输入
                    USER_INPUT_HANDLER.execute(() -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            //先清空缓冲区中的数据
                            byteBuffer.clear();
                            // 获取用户输入的文本
                            String message = new Scanner(System.in).nextLine();
                            // 将数据写入缓冲区
                            byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
                            // 将缓冲区设置为读模式
                            byteBuffer.flip();
                            try {
                                // 当缓冲区中还有数据
                                while (byteBuffer.hasRemaining()) {
                                    // 将数据写入通道
                                    socketChannel.write(byteBuffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }

                            // 判断是否退出群聊
                            if (quit(message, selector, selectionKey)) {
                                // 跳出循环,结束线程
                                break;
                            }
                        }
                        try {
                            // 关闭多路复用器
                            selector.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        // 关闭线程池
                        USER_INPUT_HANDLER.shutdown();
                    });
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 触发的是可读事件
            // 获取到可读事件的通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 将通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                // 切换成读模式
                byteBuffer.flip();
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                byteBuffer.clear();
                log.info("接收到数据:[{}]", message);
                if (StringUtils.isBlank(message)) {
                    log.debug("服务器拉胯,下车...");
                    selector.close();
                    USER_INPUT_HANDLER.shutdownNow();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 退出群聊
     *
     * @param message      消息
     * @param selector     多路复用器
     * @param selectionKey 触发的selectionKey
     * @return 是否退出
     */
    public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
        if (Constants.KEY_WORD_QUIT.equals(message)) {
            selectionKey.cancel();
            selector.wakeup();
            return true;
        }
        return false;
    }


    public static void main(String[] args) {
        NioChatClient nioChatClient = new NioChatClient();
        nioChatClient.setUserName("小9");
        nioChatClient.start();
    }
}

# 测试

  • 接入
  • 客户端发送消息
  • 消息转发
  • 客户端下线
  • 服务器宕机

# 源代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio

# 系列文章

【BIO】基于BIO实现简单动态HTTP服务器

【BIO】通过指定消息大小实现的多人聊天室-终极版本

BIO在聊天室项目中的演化

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 喜欢天文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Netty基础—4.NIO的使用简介
在NIO中,所有的数据都是通过使用Buffer缓冲区来处理的。如果要通过NIO,将数据写到文件和网络或从文件和网络中读取数据,那么就需要使用Buffer缓冲区来进行处理。
东阳马生架构
2025/05/20
620
Bio、Nio、Aio的用法系列之NIO客户端(三)
上一篇文章我们提到了NIO,大家应该对NIO有了一定的了解,接下来我们继续学习NIO的客户端实现
用户1257393
2018/07/30
3690
【死磕Netty】-----NIO基础详解
原文出处http://cmsblogs.com/ 『chenssy』 转载请注明原创出处,谢谢! Netty 是基于Java NIO 封装的网络通讯框架,只有充分理解了 Java NIO 才能理解好Netty的底层设计。Java NIO 由三个核心组件组件: Buffer Channel Selector 缓冲区 Buffer Buffer 是一个数据对象,我们可以把它理解为固定数量的数据的容器,它包含一些要写入或者读出的数据。 在 Java NIO 中,任何时候访问 NIO 中的数据,都需要通过缓冲区(B
用户1655470
2018/04/26
1.3K0
【死磕Netty】-----NIO基础详解
Java网络编程——NIO处理写事件(SelectionKey.OP_WRITE)
在前面NIO的例子中,在服务端,有对连接事件(SelectionKey.OP_ACCEPT)的处理,也有对读事件(SelectionKey.OP_READ)的处理,但是没有对写事件(SelectionKey.OP_WRITE)进行处理,原因就是写事件有些特殊,在这里单独记录一下。
DannyHoo
2022/08/07
9870
Bio、Nio、Aio的用法系列之NIO服务端(二)
技术圈有很多人说NIO是new IO,是因为他是新增的接口,这也是官方说法,但是,我们知道,以前都是阻塞IO,详细见上文BIO详解,而NIO是非阻塞的,所以说,NIO更确切的说法 是non-block IO,当然关于说法,大家可以根据自己的理解,不过多做解释。 首先在讲解NIO之前我们先了解几个概念
用户1257393
2018/07/30
3460
Java NIO深入理解ServerSocketChannel
JAVA NIO有两种解释:一种叫非阻塞IO(Non-blocking I/O),另一种也叫新的IO(New I/O),其实是同一个概念。它是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。
用户1251985
2019/07/02
1.5K0
Java NIO深入理解ServerSocketChannel
Netty系列| Netty创始人告诉你为什么选择NIO
NIO模型 同步非阻塞 NIO有同步阻塞和同步非阻塞两种模式,一般讲的是同步非阻塞,服务器实现模式为一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
狼王编程
2021/06/01
1.4K0
Netty系列| Netty创始人告诉你为什么选择NIO
Netty序章之BIO NIO AIO演变
Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠的网络服务器和客户端程序。Netty简化了网络程序的开发,是很多框架和公司都在使用的技术。更是面试的加分项。Netty并非横空出世,它是在BIO,NIO,AIO演变中的产物,是一种NIO框架。而BIO,NIO,AIO更是笔试中要考,面试中要问的技术。也是一个很好的加分项,加分就是加工资,你还在等什么?本章带你细细品味三者的不同! 流程图:
用户1212940
2022/04/13
5570
Netty序章之BIO NIO AIO演变
BIO、NIO、IO多路复用模型的演进&Java NIO 网络编程
上文介绍了网络编程的基础知识,并基于 Java 编写了 BIO 的网络编程。我们知道 BIO 模型是存在巨大问题的,比如 C10K 问题,其本质就是因其阻塞原因,导致如果想要承受更多的请求就必须有足够多的线程,但是足够多的线程会带来内存占用问题、CPU上下文切换带来的性能问题,从而造成服务端崩溃的现象。怎么解决这一问题呢?优化呗,所以后面就有了NIO、AIO、IO多路复用。本文将对这几个模型详细说明并基于 Java 编写 NIO。
王二蛋
2024/01/18
7910
Java网络编程之NIO
有人称之为New I/O,因为它是相对于之前的I/O库是新的,不过在NIO之前是BIO,即阻塞I/O,所以NIO的目标是让Java支持非阻塞的I/O,所以有人也称之为非阻塞I/O。
心平气和
2021/03/16
2780
【Netty】NIO 网络编程 聊天室案例
① 服务器 客户端 通信 : 服务器 与 客户端 实现 双向通信 ; 服务器可以写出数据到客户端 , 也能读取客户端的数据 ; 客户端可以写出数据到服务器端 , 也可以读取服务器端的数据 ;
韩曙亮
2023/03/27
1.4K0
【Netty】NIO 网络编程 聊天室案例
Java中的NIO基础知识
上一篇介绍了五种NIO模型,本篇将介绍Java中的NIO类库,为学习netty做好铺垫
Janti
2018/08/01
5580
Java中的NIO基础知识
NIO初探
缓冲区Buffer是一个对象,它包含要写入和读出的数据。在NIO中加入Buffer对象是与BIO的一个重要区别。NIO中所有数据都是通过缓冲区处理的。缓冲区实质上是一个数组,缓冲区提供对数据的结构化访问和维护读写位置等信息。
黑洞代码
2021/01/14
3480
NIO初探
JAVA网络IO之NIO/BIO
以Linux为例: 第一阶段:调用read读取socket数据,有数据则读取,没数据则等待。 第二阶段:调用read读取socket数据,有数据则读取,没数据则返回-1, errno设置为EAGAIN。 第三阶段:监听socket,有数据则通知。
程序猿川子
2025/02/19
1220
JAVA网络IO之NIO/BIO
Java NIO 同步非阻塞应用实例
项目地址:https://github.com/windwant/windwant-service/tree/master/io-service
WindWant
2020/09/11
5980
掌握NIO,程序人生
就像新IO为java带来的革新那样,让我们也开启一段新的程序人生。 关键字:NIO,BIO,伪IO,AIO,多路复用选择器,通道,缓冲区,jdk研究,回调函数,高并发 java.nio 概述 历史背景 在java nio出现之前,java网络IO是只有输入输出流操作的基于同步阻塞的Socket编程,这在实际应用中效率低下,因此当时高性能服务器开发领域一直被拥有更接近UNIX操作系统的Channel概念的C++和C长期占据。我们知道现代操作系统的根源都是来自于UNIX系统,它代表了操作系统层面底层
文彬
2018/05/08
1.3K1
BIO NIO AIO演变1 BIO2 NIO3 AIO4 常见面试题5 总结
Block-IO 是一种阻塞同步的通信模式。 常说的Socket IO 一般指的是BIO。是一个比较传统的通信方式,模式简单,使用方便。但并发处理能力低,通信耗时,依赖网速。
JavaEdge
2018/08/10
1.4K0
BIO NIO AIO演变1 BIO2 NIO3  AIO4 常见面试题5  总结
「高并发通信框架Netty4 源码解读(八)」NIO应用——聊天案例及Reactor线程模式
上面的聊天案例无论是服务端还是客户端,都是单线程的,所有的链接及读写都是在一个main方法所在的主线程内运行。
源码之路
2020/09/04
1.8K0
「高并发通信框架Netty4 源码解读(八)」NIO应用——聊天案例及Reactor线程模式
Java IO 模型之 BIO,NIO,AIO
应用场景:BIO 适合用于连接数比较小且固定的架构,这种方式对服务器资源要求比较高,但程序简单易理解。
Se7en258
2021/05/18
6920
Java IO 模型之 BIO,NIO,AIO
Java NIO
通常在进行同步I/O操作时,如果读取数据,代码会阻塞直至有 可供读取的数据。同样,写入调用将会阻塞直至数据能够写入。传统的Server/Client模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求。这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销。大多数的实现为了避免这个问题,都采用了线程池模型,并设置线程池线程的最大数量,这由带来了新的问题,如果线程池中有200个线程,而有200个用户都在进行大文件下载,会导致第201个用户的请求无法及时处理,即便第201个用户只想请求一个几KB大小的页面。传统的 Server/Client模式如下图所示:
conanma
2021/12/06
4730
相关推荐
Netty基础—4.NIO的使用简介
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验