首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于SpringBoot3实现SSE(Server send event)服务端消息推送实战

基于SpringBoot3实现SSE(Server send event)服务端消息推送实战

作者头像
码农编程进阶笔记
发布2025-08-25 10:15:58
发布2025-08-25 10:15:58
18400
代码可运行
举报
运行总次数:0
代码可运行

SSE是使用text/event-stream格式发送的,浏览器会自动将数据解析为事件。服务端只能单向的推送事件到客户端,客户端不能发送消息到服务端。

这里只是一个简单的示例,实际应用中,用户可能有不同得终端类型,后端也会集群部署以及防止消息漏送的问题。可能还会有性能相关的问题。

前端 通过使用EventSource对象来创建一个sse连接,然后通过addEventListener来监听事件。

代码语言:javascript
代码运行次数:0
运行
复制
const id = "as12121"
const eventSource = new EventSource('http://localhost:8080/sse?userId='+id,{
});
eventSource.addEventListener("message", (event) => {
  console.log(event);
});

后端

1.创建连接,需要防止重复推送的问题

代码语言:javascript
代码运行次数:0
运行
复制
  @GetMapping("/sse")
    @CrossOrigin(value = "*")
    public ResponseBodyEmitter sseServer(@RequestParam String userId){
        ResponseBodyEmitteremitter=null;
        if ((emitter = emitterMap.get(userId)) != null){
            return emitter;
        }
        // 判断其他服务器有没有对应的连接,有的话,就算了。直接返回。或者直接转发。可以通过直接调用或者通过mq推送之类的
        emitter = newSseEmitter(300000L);
        emitter.onTimeout(()->{
            emitterMap.remove(userId);
            log.info("timeout");
        });
        emitter.onCompletion(()->{
            emitterMap.remove(userId);
            log.info("completion");
        });
        // 在客户端断开连接的时候会触发error回调
        emitter.onError(e->{
            emitterMap.remove(userId);
            log.error("error",e);
        });
        log.info("create for {}",userId);
        emitterMap.put(userId, emitter);
        return emitter;
    }

2.推送消息

代码语言:javascript
代码运行次数:0
运行
复制
private void send(String message){
        emitterMap.values().forEach(emitter -> {
            try {
                doSend(emitter,message);
            } catch (IOException e) {
                log.warn("客户端断开连接了");
            }
        });
    }

3.清理连接,需要定时清理不活跃的连接

代码语言:javascript
代码运行次数:0
运行
复制
private voidscheduleCleanup() {
        newScheduledThreadPoolExecutor(1).scheduleAtFixedRate(()->{
            emitterMap.values().removeIf(emitter -> {
                try{
                    ping(emitter);
                    returnfalse;
                }catch (IOException e){
                    log.warn("清理一个不活跃的客户端");
                    returntrue;
                }
            });
        }, 0, 10, TimeUnit.SECONDS);
    }

4.完整代码

代码语言:javascript
代码运行次数:0
运行
复制

/**
 * 1. 建立连接
 * 2. 推送消息
 * 3. 清理连接
 *
 * 为了防止重复推送,需要保证全局唯一性
 */
@Slf4j
@RestController
@RequestMapping
publicclassSSEController {
    Map<String, ResponseBodyEmitter> emitterMap = newConcurrentHashMap<>();
    /**
     * 定时清理不活跃的客户端
     */
    @PostConstruct
    privatevoidscheduleCleanup() {
        newScheduledThreadPoolExecutor(1).scheduleAtFixedRate(()->{
            emitterMap.values().removeIf(emitter -> {
                try{
                    ping(emitter);
                    returnfalse;
                }catch (IOException e){
                    log.warn("清理一个不活跃的客户端");
                    returntrue;
                }
            });
        }, 0, 10, TimeUnit.SECONDS);
    }
    /**
     * 建立连接
     */
    @GetMapping("/sse")
    @CrossOrigin(value = "*")
    public ResponseBodyEmitter sseServer(@RequestParam String userId){
        ResponseBodyEmitteremitter=null;
        if ((emitter = emitterMap.get(userId)) != null){
            return emitter;
        }
        // 判断其他服务器有没有对应的连接,有的话,就算了。直接返回。或者直接转发。可以通过直接调用或者通过mq推送之类的
        emitter = newSseEmitter(300000L);
        emitter.onTimeout(()->{
            emitterMap.remove(userId);
            log.info("timeout");
        });
        emitter.onCompletion(()->{
            emitterMap.remove(userId);
            log.info("completion");
        });
        // 在客户端断开连接的时候会触发error回调
        emitter.onError(e->{
            emitterMap.remove(userId);
            log.error("error",e);
        });
        log.info("create for {}",userId);
        emitterMap.put(userId, emitter);
        return emitter;
    }
    /**
     * 推送消息,只需要通过emitter发送即可
     */
    @GetMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        send(message);
        return ResponseEntity.ok("ok");
    }
    privatevoidping(ResponseBodyEmitter emitter)throws IOException {
        Set<ResponseBodyEmitter.DataWithMediaType> dataWithMediaTypes = SseEmitter.event()
                .id(UUID.randomUUID().toString())
                .name("ping")
                .data("ping")
                .comment("comment")
                .build();
        emitter.send(dataWithMediaTypes);
    }
    privatevoidsend(String message){
        emitterMap.values().forEach(emitter -> {
            try {
                doSend(emitter,message);
            } catch (IOException e) {
                log.warn("客户端断开连接了");
            }
        });
    }
    privatevoiddoSend(ResponseBodyEmitter emitter,String message)throws IOException {
        Set<ResponseBodyEmitter.DataWithMediaType> dataWithMediaTypes = SseEmitter.event()
                .id(UUID.randomUUID().toString())
                .name("message")
                .data(message)
                .build();
        emitter.send(dataWithMediaTypes);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农编程进阶笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档