Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka+WebSocket=实时数据大屏

Kafka+WebSocket=实时数据大屏

作者头像
857技术社区
发布于 2022-05-17 07:04:21
发布于 2022-05-17 07:04:21
3.1K02
代码可运行
举报
文章被收录于专栏:857-Bigdata857-Bigdata
运行总次数:2
代码可运行

一、WebSocket简介

WebSocket网上很多教程,这里不详细描述。简单来说:WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

二、SpringBoot实现WebSocket

maven依赖如下

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        <!--Web项目必须加上-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!-- springboot websocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--Kafka依赖包-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.3.0</version>
        </dependency>

编写以下代码启用WebSocket

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 李奇峰
 * 2019年5月10日11:08:22
 * websocket的配置
 */
@Configuration
public class WebSocketStompConfig{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

编写WebSockerServer类

此类中的session连接会话全都保存在了一个静态的Map对象websocketClients 中,在开启连接时将连接会话根据连接名保存在此Map中,方便后续Kafka发送消息时进行全局调用。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.fsl.springbootwebsocket.config;

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author 2019年5月10日11:08:16
 */
@Component
@ServerEndpoint("/websocket/{socketname}")
public class WebSocketServer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 以通道名称为key,连接会话为对象保存起来
     */
    public static Map<String, Session> websocketClients = new ConcurrentHashMap<String, Session>();
    /**
     * 会话
     */
    private Session session;
    /**
     * 通道名称
     */
    private String socketname;

    /**
     * 发送消息到指定连接
     * @param socketname 连接名
     * @param jsonString 消息
     */
    public static void sendMessage(String socketname,String jsonString){
        Session nowsession = websocketClients.get(socketname);
        if(nowsession!=null){
            try {
                nowsession.getBasicRemote().sendText(jsonString);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnOpen
    public void onOpen(@PathParam("socketname") String socketname, Session session)
    {

        this.socketname = socketname;
        this.session = session;
        if(websocketClients.get(socketname)==null){
            websocketClients.put(socketname, session);
            System.out.println("当前socket通道"+socketname+"已加入连接");
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("服务端发生了错误"+error.getMessage());
    }
    /**
     * 连接关闭
     */
    @OnClose
    public void onClose()
    {
        websocketClients.remove(socketname);
        System.out.println("当前socket通道"+socketname+"已退出连接");

    }

    /**
     * 收到客户端的消息
     *
     * @param message 消息
     * @param session 会话
     */
    @OnMessage
    public void onMessage(String message, Session session){
        System.out.println("当前收到了消息:"+message);
        session.getAsyncRemote().sendText("来自服务器:"+this.socketname+"你的消息我收到啦");
    };

    /**
     * 向所有连接主动推送消息
     * @param jsonObject 消息体
     * @throws IOException
     */
    public void sendMessageAll(JSONObject jsonObject) throws IOException {
        for (Session item : websocketClients.values()) {
            item.getAsyncRemote().sendText(jsonObject.toJSONString());
        }
    }

}

三、Kafka实现

此消费者在消费消息时,会调用WebSockerServer类中的sendMessage函数,将消息发送到websocket中 此类继承了Thread类,因为Kafka运行时会一直监听通道中的消息,为了避免进程阻塞,我们将其作为单独的线程来运行

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.*;

import static com.fsl.springbootwebsocket.config.WebSocketServer.sendMessage;

public class SocketConsumer extends Thread {

    @Override
    public void run(){
        Properties prop = new Properties();
        System.out.println("启动kafka消费者....");
        prop.put("bootstrap.servers","cdh3:9092");
        prop.put("group.id","socket");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//如果是之前存在的group.id
        Consumer consumer = new KafkaConsumer(prop);
        consumer.subscribe(Arrays.asList("zeek_test"));
        while (true) {
            ConsumerRecords<String, String> c = consumer.poll(100);
            for(ConsumerRecord<String, String> c1: c) {
                System.out.println(c1.value());
                sendMessage("socket",c1.value());
            }
        }
    }
}

在此类在SpringBoot注册并启动

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import org.springframework.stereotype.Component;

@Component
public class ConsumerLinster  {
    public ConsumerLinster(){
        System.out.println("启用Kafka监听器");
        SocketConsumer socketConsumer = new SocketConsumer();
        socketConsumer.start();
        System.out.println("Kafka监听器启用成功");
    }
}

此项目整体的目录结构如下图所示

四、测试

将此项目运行后,打开http://www.websocket-test.com/此网址即可进行在线测试

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spring Boot 集成 WebSocket,轻松实现信息推送!
在一次项目开发中,使用到了Netty 网络应用框架,以及 MQTT 进行消息数据的收发,这其中需要后台来将获取到的消息主动推送给前端,于是就使用到了MQTT,特此记录一下。
Java技术栈
2021/04/21
1.2K0
从零玩转Websocket实时通讯服务之前后端分离版本
前言 公司项目需要用到消息提示,那么WebSocket它来了经过我面向百度的学习,废话不多说直接开干. 后端搭建 一、依赖导入 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> 二、搭建websocket服务 1.WebSocke
杨不易呀
2022/01/19
1.2K0
java 实现 springboot项目 使用socket推送消息,前端实时进行接收后端推送的消息(亲测有效)
这里写目录标题 1 后端 1.1 添加依赖 1.2 创建配置类WebSocketConfig 1.3 创建WebSocketServer 1.4 测试类 2 前端接收 1 后端 1.1 添加依赖 在我们的springboot项目里面,添加依赖; <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</a
一写代码就开心
2022/09/27
2.9K0
java 实现 springboot项目 使用socket推送消息,前端实时进行接收后端推送的消息(亲测有效)
SpringBoot 整合websocket|实现日志实时查看
最近在做的一个功能模块:需要将项目启动后产生的任务日志实时传送到前端,方便用户能够实时看到运行的过程,相信也有很多同学做过类似的案例。
AI码师
2022/09/19
3.1K1
SpringBoot 整合websocket|实现日志实时查看
SpringBoot2整合WebSocket,实现后台向前端推送信息
WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
Li_XiaoJin
2022/06/12
3K0
SpringBoot2整合WebSocket,实现后台向前端推送信息
Websocket集群解决方案
最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket推送消息。
用户10384376
2023/02/26
3.3K1
Websocket集群解决方案
SpringBoot集成WebSocket(原生注解方式)
WebSockets 它可以在用户的浏览器和服务器之间打开交互式通信会话。使用此API,可以向服务器发送消息并接收事件驱动的响应,而无需通过轮询服务器的方式以获得响应。 WebSocket 对象提供了用于创建和管理 WebSocket 连接,以及可以通过该连接发送和接收数据的API。 1.添加依赖 <!-- websocket--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot
甄士隐
2022/01/26
1.7K0
SpringBoot集成WebSocket(原生注解方式)
springboot+websocket实现服务端、客户端
小编最近一直在使用springboot框架开发项目,毕竟现在很多公司都在采用此框架,之后小编也会陆续写关于springboot开发常用功能的文章。
全栈程序员站长
2022/09/15
4.3K0
springboot+websocket实现服务端、客户端
springboot整合websocket技术
简单来说就是一个基于TCP的持久化的网络通信协议。主要作用就是:服务端可以主动推送信息给客户端,不需要客户端重复的向服务端发请求查询。
海加尔金鹰
2020/06/09
1.2K0
WebSocket了解一下
这两天在调试一个WebSocket的接口,折腾了一天的时间终于弄好了。现在对WebSocket的相关知识点做一个记录。主要从如下几个方面进行介绍。
码农飞哥
2021/08/18
6500
SpringBoot 整合WebSocket 简单实战案例[通俗易懂]
《Springboot 整合 WebSocket ,使用STOMP协议 ,前后端整合实战 (一)》 https://blog.csdn.net/qq_35387940/article/details/119817167
全栈程序员站长
2022/09/15
2K0
SpringBoot 整合WebSocket 简单实战案例[通俗易懂]
服务端常用的WebSocket框架
输入命令 需要修改的 url、groupId、artifactId、version
码客说
2021/01/20
1.5K0
SpringBoot之WebSocket和SSE
前言: 传统的网页都是浏览器向服务器“查询”数据,但是很多场合,最有效的方式是服务器向浏览器“发送”数据。比如,每当收到新的电子邮件,服务器就向浏览器发送一个“通知”,这要比浏览器按时向服务器查询(polling)更有效率。
王念博客
2019/07/24
1.5K0
Spring Boot使用websocket实现聊天室
先看效果:这里面demo用的是原生js跟html,方便打包一体化,在效果上我更倾向于使用vue、react等进行页面开发
余生大大
2022/11/02
9680
Spring Boot使用websocket实现聊天室
WebSocket双工通信实现用户互踢功能,一个用户同时只能在一台设备上登录需求服务端实现
最近有个需求需要控制用户在登录系统时一个用户只能在一台设备上登录。如果用户已经在一台设备上登录了,然后同一个用户又继续使用另一台设备登录,则需要踢掉在前一台设备上登录的会话,确保一个用户同一时间只有一个会话。笔者在掘金上调研了可行的技术方案,发现主要有以下两种实现方案:
用户3587585
2024/06/13
4160
WebSocket双工通信实现用户互踢功能,一个用户同时只能在一台设备上登录需求服务端实现
springboot整合websocket
我们都知道HTPP协议是基于请求响应模式,并且无状态的。HTTP通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。
全栈程序员站长
2022/09/19
1K0
springboot整合websocket
Springboot整合WebSocket(纯后端)
Java微观世界
2025/01/21
1.8K0
Springboot整合WebSocket(纯后端)
springBoot集成websocket实时消息推送
知识浅谈
2023/11/17
2.6K0
WebSocket | 为什么你前后端推送不会用?因为你少了WebSocket的帮忙
WebSocket是一种基于TCP的网络协议,实现了客户端和服务端的全双工通信,即,后端可以推送数据到客户端,客户端可以推送数据到后端。其模型如下所示:
netkiller old
2021/01/11
8620
WebSocket | 为什么你前后端推送不会用?因为你少了WebSocket的帮忙
Spring Boot集成websocket
像目前的直播、弹幕、小游戏等方面都用到了websocet进行长链接,相对于http的一次请求一次响应websocket只需要进行一次握手即长久性的建立链接进行消息互通。
余生大大
2022/11/02
1.9K0
Spring Boot集成websocket
推荐阅读
相关推荐
Spring Boot 集成 WebSocket,轻松实现信息推送!
更多 >
交个朋友
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
加入[后端] 腾讯云技术交流站
后端架构设计 高可用系统实现
加入[游戏服务器] 腾讯云官方交流站
游戏服运维小技巧 常见问题齐排查
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验