用SpringBoot集成Netty开发一个基于WebSocket的聊天室
前言
基于SpringBoot,借助Netty控制长链接,使用WebSocket协议做一个实时的聊天室。
项目效果
项目统一登录路径: http://localhost:8080/chat/netty
用户名随机生成,离线调用异步方法,数据写操作,登录显示历史聊天消息
GitHub
项目名:InChat
项目地址:https://github.com/UncleCatMy...
项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长链接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通信,异步存储聊天数据
代码实操讲解
随机命名工具类
public class RandomNameUtil { private static Random ran = new Random(); private final static int delta = 0x9fa5 - 0x4e00 + 1; public static char getName(){ return (char)(0x4e00 + ran.nextInt(delta)); } }
配置文件yml
spring: datasource: driver-class-name: com.mysql.jdbc.Driver username: root password: root url: jdbc:mysql://localhost:3306/nettychat?characterEncoding=utf-8&useSSL=false jpa: show-sql: truenetty: port: 8090 #监听端口 bossThread: 2 #线程数 workerThread: 2 #线程数 keepalive: true #保持连接 backlog: 100
数据库准备
SET FOREIGN_KEY_CHECKS=0;-- ------------------------------ Table structure for user_msg-- ----------------------------DROP TABLE IF EXISTS `user_msg`;CREATE TABLE `user_msg` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `msg` varchar(255) DEFAULT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;-- ------------------------------ Records of user_msg-- ----------------------------INSERT INTO `user_msg` VALUES ('1', '亪', '今天不开心', '2018-08-14 14:26:02', '2018-08-14 14:26:02');INSERT INTO `user_msg` VALUES ('2', '祐', '不错呀', '2018-08-14 15:09:40', '2018-08-14 15:09:40');INSERT INTO `user_msg` VALUES ('3', '搈', '开心 开心', '2018-08-14 15:09:40', '2018-08-14 15:09:40');INSERT INTO `user_msg` VALUES ('4', '兇', '可以的,后面再做个深入一点的', '2018-08-14 15:18:35', '2018-08-14 15:18:35');INSERT INTO `user_msg` VALUES ('5', '倎', '开源这个项目', '2018-08-14 15:18:35', '2018-08-14 15:18:35');INSERT INTO `user_msg` VALUES ('6', '蝡', '1-someting', '2018-08-14 15:24:28', '2018-08-14 15:24:28');INSERT INTO `user_msg` VALUES ('7', '弔', '不行呀', '2018-08-14 15:24:29', '2018-08-14 15:24:29');INSERT INTO `user_msg` VALUES ('8', '習', '可以的', '2018-08-14 15:26:03', '2018-08-14 15:26:03');INSERT INTO `user_msg` VALUES ('9', '蔫', '开源这个项目', '2018-08-14 15:26:03', '2018-08-14 15:26:03');
dataObject与JPA数据DAO
@Data@Entity@DynamicUpdatepublic class UserMsg implements Serializable { private static final long serialVersionUID = 4133316147283239759L; @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; private String name; private String msg; private Date createTime; private Date updateTime; }
public interface UserMsgRepository extends JpaRepository {//本次未使用到自定义方法,JPA原生即可}
NoSQL模拟环境
我没有去配置虚拟机环境,就本地模拟了
保存用户名称与链接随机ID
@Componentpublic class LikeRedisTemplate { private Map RedisMap = new ConcurrentHashMap(); public void save(Object id,Object name){ RedisMap.put(id,name); } public void delete(Object id){ RedisMap.remove(id); } public Object get(Object id){ return RedisMap.get(id); } }
聊天内容临时存储
@Componentpublic class LikeSomeCacheTemplate { private Set SomeCache = new LinkedHashSet(); public void save(Object user,Object msg){ UserMsg userMsg = new UserMsg(); userMsg.setName(String.valueOf(user)); userMsg.setMsg(String.valueOf(msg)); SomeCache.add(userMsg); } public Set cloneCacheMap(){ return SomeCache; } public void clearCacheMap(){ SomeCache.clear(); } }
异步任务处理
@Componentpublic class MsgAsyncTesk { @Autowired private LikeSomeCacheTemplate cacheTemplate; @Autowired private UserMsgRepository userMsgRepository; @Async public Future saveChatMsgTask() throws Exception{// System.out.println("启动异步任务"); Set set = cacheTemplate.cloneCacheMap(); for (UserMsg item:set){ //保存用户消息 userMsgRepository.save(item); } //清空临时缓存 cacheTemplate.clearCacheMap(); return new AsyncResult(true); } }
netty核心
配置类
@Data@Component@ConfigurationProperties(prefix = "netty")public class NettyAccountConfig { private int port; private int bossThread; private int workerThread; private boolean keepalive; private int backlog; }
核心消息处理类
@Component@Qualifier("textWebSocketFrameHandler")@ChannelHandler.Sharablepublic class TextWebSocketFrameHandler extends SimpleChannelInboundHandler{ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Autowired private LikeRedisTemplate redisTemplate; @Autowired private LikeSomeCacheTemplate cacheTemplate; @Autowired private MsgAsyncTesk msgAsyncTesk; @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id())); for (Channel channel : channels) { //将当前每个聊天内容进行存储 System.out.println("存储数据:"+uName+"-"+msg.text()); cacheTemplate.save(uName,msg.text()); if (channel != incoming){ channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() )); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()); String uName = String.valueOf(RandomNameUtil.getName()); //用来获取一个随机的用户名,可以用其他方式代替 //新用户接入 Channel incoming = ctx.channel(); for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[新用户] - " + uName + " 加入")); } redisTemplate.save(incoming.id(),uName); //存储用户 channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id())); //用户离开 for (Channel channel : channels) { channel.writeAndFlush(new TextWebSocketFrame("[用户] - " + uName + " 离开")); } redisTemplate.delete(incoming.id()); //删除用户 channels.remove(ctx.channel()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:"+redisTemplate.get(incoming.id())+"在线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:"+redisTemplate.get(incoming.id())+"掉线"); msgAsyncTesk.saveChatMsgTask(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:" + redisTemplate.get(incoming.id()) + "异常"); cause.printStackTrace(); ctx.close(); } }
定义Initializer
@Component@Qualifier("somethingChannelInitializer")public class NettyWebSocketChannelInitializer extends ChannelInitializer { @Autowired private TextWebSocketFrameHandler textWebSocketFrameHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(textWebSocketFrameHandler); //这里不能使用new,不然在handler中不能注入依赖 } }
启动创建Netty基本组件
@Componentpublic class NettyConfig { @Autowired private NettyAccountConfig nettyAccountConfig; @Bean(name = "bossGroup", destroyMethod = "shutdownGracefully") public NioEventLoopGroup bossGroup(){ return new NioEventLoopGroup(nettyAccountConfig.getBossThread()); } @Bean(name = "workerGroup", destroyMethod = "shutdownGracefully") public NioEventLoopGroup workerGroup(){ return new NioEventLoopGroup(nettyAccountConfig.getWorkerThread()); } @Bean(name = "tcpSocketAddress") public InetSocketAddress tcpPost(){ return new InetSocketAddress(nettyAccountConfig.getPort()); } @Bean(name = "tcpChannelOptions") public Map, Object> tcpChannelOptions(){ Map, Object> options = new HashMap, Object>(); options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive()); options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog()); return options; } @Autowired @Qualifier("somethingChannelInitializer") private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer; @Bean(name = "serverBootstrap") public ServerBootstrap bootstrap(){ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(nettyWebSocketChannelInitializer); Map, Object> tcpChannelOptions = tcpChannelOptions(); Set> keySet = tcpChannelOptions.keySet(); for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) { b.option(option, tcpChannelOptions.get(option)); } return b; } }
服务启动协助类
@Data@Componentpublic class TCPServer { @Autowired @Qualifier("serverBootstrap") private ServerBootstrap serverBootstrap; @Autowired @Qualifier("tcpSocketAddress") private InetSocketAddress tcpPort; private Channel serverChannel; public void start() throws Exception { serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel(); } @PreDestroy public void stop() throws Exception { serverChannel.close(); serverChannel.parent().close(); } }
项目启动
@SpringBootApplication@EnableScheduling //启动异步任务public class NettychatApplication { public static void main(String[] args) throws Exception{ ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args); //注入NettyConfig 获取对应Bean NettyConfig nettyConfig = context.getBean(NettyConfig.class); //注入TCPServer 获取对应Bean TCPServer tcpServer = context.getBean(TCPServer.class); //启动websocket的服务 tcpServer.start(); } }
GitHub
项目名:InChat
项目地址:https://github.com/UncleCatMy...
项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长链接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通信,异步存储聊天数据
领取专属 10元无门槛券
私享最新 技术干货