首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从 0 到 1 打造企业级站内消息系统:架构设计与实战落地

从 0 到 1 打造企业级站内消息系统:架构设计与实战落地

作者头像
果酱带你啃java
发布2026-04-14 12:50:22
发布2026-04-14 12:50:22
100
举报

引言:为什么需要一套优秀的站内消息系统?

在当今的 Web 和移动应用中,站内消息系统已经成为连接用户与平台的重要桥梁。想象一下这些场景:当你在电商平台下单后,会收到订单状态变更的通知;在社交应用中,有人给你发消息或点赞时,你会收到提醒;在企业系统中,审批流程的每一步进展都需要及时通知相关人员。

一个设计良好的站内消息系统能够:

  • 提升用户活跃度和留存率
  • 确保重要信息及时触达用户
  • 降低用户错过关键操作的风险
  • 为未来的多渠道通知(短信、邮件等)奠定基础

然而,看似简单的消息功能背后,却隐藏着不少技术挑战:如何高效处理消息的已读 / 未读状态?如何支持未来的多渠道扩展?如何实现前端实时显示新消息?本文将从架构设计到代码实现,全面解析如何构建一套可扩展、高性能的站内消息系统。

一、需求分析与系统设计

1.1 核心需求梳理

我们需要设计的站内消息系统需满足以下核心需求:

  1. 基础消息功能发送消息、接收消息、查看消息列表
  2. 状态管理支持已读 / 未读状态,统计未读消息数量
  3. 扩展性架构设计需考虑未来扩展到短信、邮件、AI 电话提醒等多渠道
  4. 实时性前端能够实时接收并显示新的未读消息
  5. 可靠性确保消息不丢失、不重复

1.2 系统架构设计

基于上述需求,我们设计如下系统架构:

架构说明:

  • 核心服务层包含消息服务、模板服务和渠道服务,实现消息的核心逻辑
  • 数据层使用 MySQL 存储消息数据,Redis 缓存未读消息数量等高频访问数据
  • 通信层提供 REST API 供客户端主动操作,WebSocket 实现实时推送
  • 渠道层抽象出统一的消息发送接口,便于扩展各种消息渠道

1.3 消息生命周期

消息从创建到被用户阅读的完整生命周期如下:

二、数据库设计

一个健壮的数据库设计是系统可靠运行的基础。我们需要设计消息相关的表结构,既要满足当前需求,又要为未来扩展预留空间。

2.1 表结构设计

2.1.1 消息表(sys_message)

存储消息的基本内容,一条消息可以被发送给多个用户。

代码语言:javascript
复制
CREATE TABLE `sys_message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '消息ID',
  `title` varchar(255) NOT NULL COMMENT '消息标题',
  `content` text NOT NULL COMMENT '消息内容',
  `message_type` tinyint NOT NULL COMMENT '消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息',
  `template_id` bigint DEFAULT NULL COMMENT '消息模板ID,关联sys_message_template表',
  `sender_id` varchar(64) DEFAULT NULL COMMENT '发送者ID',
  `sender_name` varchar(128) DEFAULT NULL COMMENT '发送者名称',
  `is_global` tinyint NOT NULL DEFAULT '0' COMMENT '是否全局消息:0-否,1-是',
  `status` tinyint NOT NULL DEFAULT '1' COMMENT '状态:0-已删除,1-正常',
  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_message_type` (`message_type`),
  KEY `idx_created_time` (`created_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统消息表';
代码语言:javascript
复制

2.1.2 用户消息关联表(sys_user_message)

记录用户与消息的关联关系,包括已读 / 未读状态。

代码语言:javascript
复制
CREATE TABLE `sys_user_message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `message_id` bigint NOT NULL COMMENT '消息ID,关联sys_message表',
  `user_id` varchar(64) NOT NULL COMMENT '用户ID',
  `is_read` tinyint NOT NULL DEFAULT '0' COMMENT '是否已读:0-未读,1-已读',
  `read_time` datetime DEFAULT NULL COMMENT '阅读时间',
  `receive_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '接收时间',
  `status` tinyint NOT NULL DEFAULT '1' COMMENT '状态:0-已删除,1-正常',
  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_user` (`message_id`,`user_id`) COMMENT '确保一个用户不会收到重复消息',
  KEY `idx_user_id` (`user_id`),
  KEY `idx_is_read` (`is_read`),
  KEY `idx_receive_time` (`receive_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户消息关联表';
代码语言:javascript
复制

2.1.3 消息渠道配置表(sys_message_channel)

配置各种消息渠道的参数。

代码语言:javascript
复制
CREATE TABLE `sys_message_channel` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `channel_code` varchar(32) NOT NULL COMMENT '渠道编码:IN_APP-站内消息,SMS-短信,EMAIL-邮件,AI_CALL-AI电话',
  `channel_name` varchar(64) NOT NULL COMMENT '渠道名称',
  `config` text COMMENT '渠道配置,JSON格式',
  `is_enabled` tinyint NOT NULL DEFAULT '1' COMMENT '是否启用:0-禁用,1-启用',
  `sort_order` int NOT NULL DEFAULT '0' COMMENT '排序序号',
  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_channel_code` (`channel_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息渠道配置表';
代码语言:javascript
复制

2.1.4 消息模板表(sys_message_template)

存储消息模板,便于统一管理和维护消息格式。

代码语言:javascript
复制
CREATE TABLE `sys_message_template` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `template_code` varchar(64) NOT NULL COMMENT '模板编码',
  `template_name` varchar(128) NOT NULL COMMENT '模板名称',
  `message_type` tinyint NOT NULL COMMENT '消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息',
  `title_template` varchar(255) NOT NULL COMMENT '标题模板,支持变量替换',
  `content_template` text NOT NULL COMMENT '内容模板,支持变量替换',
  `channels` varchar(128) NOT NULL COMMENT '支持的渠道,逗号分隔:IN_APP,SMS,EMAIL',
  `is_enabled` tinyint NOT NULL DEFAULT '1' COMMENT '是否启用:0-禁用,1-启用',
  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_template_code` (`template_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息模板表';
代码语言:javascript
复制

2.1.5 用户渠道偏好设置表(sys_user_channel_preference)

记录用户对各消息渠道的偏好设置。

代码语言:javascript
复制
CREATE TABLE `sys_user_channel_preference` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `user_id` varchar(64) NOT NULL COMMENT '用户ID',
  `message_type` tinyint NOT NULL COMMENT '消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息',
  `channels` varchar(128) NOT NULL COMMENT '首选渠道,逗号分隔:IN_APP,SMS,EMAIL',
  `is_enabled` tinyint NOT NULL DEFAULT '1' COMMENT '是否启用:0-禁用,1-启用',
  `created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_user_type` (`user_id`,`message_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户渠道偏好设置表';
代码语言:javascript
复制

2.2 设计说明

  1. 分离消息内容和用户状态:将消息内容(sys_message)和用户的已读状态(sys_user_message)分开存储,避免同一条消息被多次存储。
  2. 支持多渠道扩展:通过消息渠道配置表和用户渠道偏好设置表,为未来扩展短信、邮件等渠道提供支持。
  3. 引入消息模板:使用消息模板统一管理消息格式,便于维护和修改。
  4. 合理的索引设计:在查询频繁的字段上建立索引,如用户 ID、消息类型、已读状态等,提高查询效率。

三、核心功能实现

3.1 项目依赖配置

首先,我们需要配置 Maven 依赖:

代码语言:javascript
复制
<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- Spring Boot Starter WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- Spring Boot Starter Data Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- MyBatis-Plus -->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.5</version>
    </dependency>

    <!-- MySQL Connector -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <version>8.3.0</version>
        <scope>runtime</scope>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
        <scope>provided</scope>
    </dependency>

    <!-- Spring Utils -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>6.1.1</version>
    </dependency>

    <!-- Google Guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>33.1.0-jre</version>
    </dependency>

    <!-- FastJSON2 -->
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.32</version>
    </dependency>

    <!-- Swagger3 -->
    <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
        <version>2.1.0</version>
    </dependency>

    <!-- Hutool -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.25</version>
    </dependency>
</dependencies>
代码语言:javascript
复制

3.2 实体类定义

使用 MyBatis-Plus 的实体类注解,定义与数据库表对应的实体类。

3.2.1 消息实体(SysMessage)
代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 系统消息实体类
 *
 * @author ken
 */
@Data
@TableName("sys_message")
public class SysMessage {

    /**
     * 消息ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 消息标题
     */
    private String title;

    /**
     * 消息内容
     */
    private String content;

    /**
     * 消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息
     */
    private Integer messageType;

    /**
     * 消息模板ID,关联sys_message_template表
     */
    private Long templateId;

    /**
     * 发送者ID
     */
    private String senderId;

    /**
     * 发送者名称
     */
    private String senderName;

    /**
     * 是否全局消息:0-否,1-是
     */
    private Integer isGlobal;

    /**
     * 状态:0-已删除,1-正常
     */
    private Integer status;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createdTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updatedTime;
}
代码语言:javascript
复制

3.2.2 用户消息关联实体(SysUserMessage)
代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 用户消息关联实体类
 *
 * @author ken
 */
@Data
@TableName("sys_user_message")
public class SysUserMessage {

    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 消息ID,关联sys_message表
     */
    private Long messageId;

    /**
     * 用户ID
     */
    private String userId;

    /**
     * 是否已读:0-未读,1-已读
     */
    private Integer isRead;

    /**
     * 阅读时间
     */
    private LocalDateTime readTime;

    /**
     * 接收时间
     */
    private LocalDateTime receiveTime;

    /**
     * 状态:0-已删除,1-正常
     */
    private Integer status;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createdTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updatedTime;
}
代码语言:javascript
复制

3.2.3 消息渠道配置实体(SysMessageChannel)
代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 消息渠道配置实体类
 *
 * @author ken
 */
@Data
@TableName("sys_message_channel")
public class SysMessageChannel {

    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 渠道编码:IN_APP-站内消息,SMS-短信,EMAIL-邮件,AI_CALL-AI电话
     */
    private String channelCode;

    /**
     * 渠道名称
     */
    private String channelName;

    /**
     * 渠道配置,JSON格式
     */
    private String config;

    /**
     * 是否启用:0-禁用,1-启用
     */
    private Integer isEnabled;

    /**
     * 排序序号
     */
    private Integer sortOrder;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createdTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updatedTime;
}
代码语言:javascript
复制

3.2.4 消息模板实体(SysMessageTemplate)
代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 消息模板实体类
 *
 * @author ken
 */
@Data
@TableName("sys_message_template")
public class SysMessageTemplate {

    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 模板编码
     */
    private String templateCode;

    /**
     * 模板名称
     */
    private String templateName;

    /**
     * 消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息
     */
    private Integer messageType;

    /**
     * 标题模板,支持变量替换
     */
    private String titleTemplate;

    /**
     * 内容模板,支持变量替换
     */
    private String contentTemplate;

    /**
     * 支持的渠道,逗号分隔:IN_APP,SMS,EMAIL
     */
    private String channels;

    /**
     * 是否启用:0-禁用,1-启用
     */
    private Integer isEnabled;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createdTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updatedTime;
}
代码语言:javascript
复制

3.2.5 用户渠道偏好设置实体(SysUserChannelPreference)
代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 用户渠道偏好设置实体类
 *
 * @author ken
 */
@Data
@TableName("sys_user_channel_preference")
public class SysUserChannelPreference {

    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 用户ID
     */
    private String userId;

    /**
     * 消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息
     */
    private Integer messageType;

    /**
     * 首选渠道,逗号分隔:IN_APP,SMS,EMAIL
     */
    private String channels;

    /**
     * 是否启用:0-禁用,1-启用
     */
    private Integer isEnabled;

    /**
     * 创建时间
     */
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createdTime;

    /**
     * 更新时间
     */
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updatedTime;
}
代码语言:javascript
复制

3.3 数据访问层(DAO)

使用 MyBatis-Plus 的 BaseMapper 实现基本的数据访问操作。

3.3.1 消息 Mapper(SysMessageMapper)
代码语言:javascript
复制
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

/**
 * 消息Mapper接口
 *
 * @author ken
 */
@Mapper
public interface SysMessageMapper extends BaseMapper<SysMessage> {
}
代码语言:javascript
复制

3.3.2 用户消息关联 Mapper(SysUserMessageMapper)
代码语言:javascript
复制
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Map;

/**
 * 用户消息关联Mapper接口
 *
 * @author ken
 */
@Mapper
public interface SysUserMessageMapper extends BaseMapper<SysUserMessage> {

    /**
     * 分页查询用户消息列表,关联消息内容
     *
     * @param page 分页参数
     * @param queryWrapper 查询条件
     * @return 分页结果,包含消息详情
     */
    IPage<Map<String, Object>> selectUserMessagePage(
            IPage<Map<String, Object>> page,
            @Param(Constants.WRAPPER) Wrapper<SysUserMessage> queryWrapper);

    /**
     * 统计用户未读消息数量
     *
     * @param userId 用户ID
     * @return 未读消息数量
     */
    Long countUnreadByUserId(@Param("userId") String userId);

    /**
     * 批量更新消息为已读状态
     *
     * @param userId 用户ID
     * @param messageIds 消息ID列表
     * @return 更新的记录数
     */
    int batchUpdateReadStatus(
            @Param("userId") String userId,
            @Param("messageIds") Iterable<Long> messageIds);
}
代码语言:javascript
复制

对应的 XML 映射文件(SysUserMessageMapper.xml):

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.message.mapper.SysUserMessageMapper">

    <select id="selectUserMessagePage" resultType="java.util.Map">
        SELECT 
            sum.user_id,
            sum.message_id,
            sum.is_read,
            sum.read_time,
            sum.receive_time,
            msg.title,
            msg.content,
            msg.message_type,
            msg.sender_id,
            msg.sender_name,
            msg.created_time as message_created_time
        FROM 
            sys_user_message sum
        LEFT JOIN 
            sys_message msg ON sum.message_id = msg.id
        ${ew.customSqlSegment}
        ORDER BY 
            sum.receive_time DESC
    </select>

    <select id="countUnreadByUserId" resultType="java.lang.Long">
        SELECT 
            COUNT(1) 
        FROM 
            sys_user_message 
        WHERE 
            user_id = #{userId} 
            AND is_read = 0 
            AND status = 1
    </select>

    <update id="batchUpdateReadStatus">
        UPDATE 
            sys_user_message 
        SET 
            is_read = 1,
            read_time = NOW(),
            updated_time = NOW()
        WHERE 
            user_id = #{userId}
            AND message_id IN 
            <foreach collection="messageIds" item="id" open="(" separator="," close=")">
                #{id}
            </foreach>
            AND is_read = 0
            AND status = 1
    </update>
</mapper>
代码语言:javascript
复制

其他 Mapper 接口的实现类似,这里不再一一列出。

3.4 枚举类定义

定义一些常用的枚举类,提高代码的可读性和可维护性。

3.4.1 消息类型枚举(MessageTypeEnum)
代码语言:javascript
复制
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * 消息类型枚举
 *
 * @author ken
 */
@Getter
@AllArgsConstructor
public enum MessageTypeEnum {

    /**
     * 系统通知
     */
    SYSTEM_NOTICE(1, "系统通知"),

    /**
     * 业务提醒
     */
    BUSINESS_REMINDER(2, "业务提醒"),

    /**
     * 互动消息
     */
    INTERACTIVE_MESSAGE(3, "互动消息"),

    /**
     * 营销消息
     */
    MARKETING_MESSAGE(4, "营销消息");

    /**
     * 类型编码
     */
    private final Integer code;

    /**
     * 类型名称
     */
    private final String name;

    /**
     * 根据编码获取枚举
     *
     * @param code 编码
     * @return 枚举,不存在则返回null
     */
    public static MessageTypeEnum getByCode(Integer code) {
        if (ObjectUtils.isEmpty(code)) {
            return null;
        }
        for (MessageTypeEnum type : values()) {
            if (type.code.equals(code)) {
                return type;
            }
        }
        return null;
    }
}
代码语言:javascript
复制

3.4.2 消息渠道枚举(MessageChannelEnum)
代码语言:javascript
复制
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * 消息渠道枚举
 *
 * @author ken
 */
@Getter
@AllArgsConstructor
public enum MessageChannelEnum {

    /**
     * 站内消息
     */
    IN_APP("IN_APP", "站内消息"),

    /**
     * 短信
     */
    SMS("SMS", "短信"),

    /**
     * 邮件
     */
    EMAIL("EMAIL", "邮件"),

    /**
     * AI电话
     */
    AI_CALL("AI_CALL", "AI电话");

    /**
     * 渠道编码
     */
    private final String code;

    /**
     * 渠道名称
     */
    private final String name;

    /**
     * 根据编码获取枚举
     *
     * @param code 编码
     * @return 枚举,不存在则返回null
     */
    public static MessageChannelEnum getByCode(String code) {
        if (StringUtils.isEmpty(code)) {
            return null;
        }
        for (MessageChannelEnum channel : values()) {
            if (channel.code.equals(code)) {
                return channel;
            }
        }
        return null;
    }
}
代码语言:javascript
复制

3.5 核心服务实现

3.5.1 消息渠道接口定义

为了支持多种消息渠道,我们定义一个统一的消息发送接口:

代码语言:javascript
复制
import com.example.message.vo.MessageSendVO;

/**
 * 消息渠道接口
 *
 * @author ken
 */
public interface MessageChannel {

    /**
     * 获取渠道编码
     *
     * @return 渠道编码
     */
    String getChannelCode();

    /**
     * 发送消息
     *
     * @param message 消息内容
     * @return 发送结果,true-成功,false-失败
     */
    boolean send(MessageSendVO message);
}
代码语言:javascript
复制

3.5.2 站内消息渠道实现
代码语言:javascript
复制
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysMessage;
import com.example.message.entity.SysUserMessage;
import com.example.message.mapper.SysMessageMapper;
import com.example.message.mapper.SysUserMessageMapper;
import com.example.message.service.NotifyService;
import com.example.message.vo.MessageSendVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 站内消息渠道实现
 *
 * @author ken
 */
@Slf4j
@Component
public class InAppMessageChannel implements MessageChannel {

    private final SysMessageMapper messageMapper;
    private final SysUserMessageMapper userMessageMapper;
    private final NotifyService notifyService;

    public InAppMessageChannel(SysMessageMapper messageMapper,
                             SysUserMessageMapper userMessageMapper,
                             NotifyService notifyService) {
        this.messageMapper = messageMapper;
        this.userMessageMapper = userMessageMapper;
        this.notifyService = notifyService;
    }

    @Override
    public String getChannelCode() {
        return "IN_APP";
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean send(MessageSendVO message) {
        log.info("通过站内消息渠道发送消息,接收用户:{},消息标题:{}",
                message.getReceiverIds(), message.getTitle());

        try {
            // 1. 保存消息主表
            SysMessage sysMessage = new SysMessage();
            sysMessage.setTitle(message.getTitle());
            sysMessage.setContent(message.getContent());
            sysMessage.setMessageType(message.getMessageType());
            sysMessage.setTemplateId(message.getTemplateId());
            sysMessage.setSenderId(message.getSenderId());
            sysMessage.setSenderName(message.getSenderName());
            sysMessage.setIsGlobal(CollectionUtils.isEmpty(message.getReceiverIds()) ? 1 : 0);
            sysMessage.setStatus(1);
            messageMapper.insert(sysMessage);

            // 如果是全局消息,这里可以有专门的处理逻辑,比如异步批量插入所有用户
            // 这里简化处理,只处理指定用户
            if (!CollectionUtils.isEmpty(message.getReceiverIds())) {
                // 2. 批量保存用户消息关联
                List<SysUserMessage> userMessages = message.getReceiverIds().stream()
                        .map(receiverId -> {
                            SysUserMessage userMessage = new SysUserMessage();
                            userMessage.setMessageId(sysMessage.getId());
                            userMessage.setUserId(receiverId);
                            userMessage.setIsRead(0);
                            userMessage.setReceiveTime(LocalDateTime.now());
                            userMessage.setStatus(1);
                            return userMessage;
                        })
                        .collect(Collectors.toList());

                // 批量插入
                if (!CollectionUtils.isEmpty(userMessages)) {
                    userMessageMapper.batchInsert(userMessages);
                }

                // 3. 推送实时通知
                notifyService.notifyNewMessage(sysMessage.getId(), message.getReceiverIds());
            }

            log.info("站内消息发送成功,消息ID:{}", sysMessage.getId());
            return true;
        } catch (Exception e) {
            log.error("站内消息发送失败", e);
            return false;
        }
    }
}
代码语言:javascript
复制

3.5.3 渠道服务实现

渠道服务用于管理和选择合适的消息渠道:

代码语言:javascript
复制
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysUserChannelPreference;
import com.example.message.enums.MessageChannelEnum;
import com.example.message.mapper.SysUserChannelPreferenceMapper;
import com.example.message.vo.MessageSendVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * 消息渠道服务
 *
 * @author ken
 */
@Slf4j
@Service
public class MessageChannelService {

    private final Map<String, MessageChannel> channelMap;
    private final SysUserChannelPreferenceMapper userChannelPreferenceMapper;

    /**
     * 构造函数,自动注入所有MessageChannel实现
     *
     * @param channels 消息渠道列表
     * @param userChannelPreferenceMapper 用户渠道偏好Mapper
     */
    public MessageChannelService(List<MessageChannel> channels,
                               SysUserChannelPreferenceMapper userChannelPreferenceMapper) {
        this.channelMap = channels.stream()
                .collect(Collectors.toMap(MessageChannel::getChannelCode, channel -> channel));
        this.userChannelPreferenceMapper = userChannelPreferenceMapper;
    }

    /**
     * 获取所有可用的消息渠道
     *
     * @return 消息渠道列表
     */
    public List<MessageChannel> getAllChannels() {
        return Lists.newArrayList(channelMap.values());
    }

    /**
     * 根据渠道编码获取消息渠道
     *
     * @param channelCode 渠道编码
     * @return 消息渠道,不存在则返回null
     */
    public MessageChannel getChannelByCode(String channelCode) {
        if (StringUtils.isEmpty(channelCode)) {
            return null;
        }
        return channelMap.get(channelCode);
    }

    /**
     * 为消息选择合适的渠道并发送
     *
     * @param message 消息内容
     * @return 发送结果
     */
    public boolean sendMessage(MessageSendVO message) {
        log.info("开始发送消息,标题:{},接收用户:{}", message.getTitle(), message.getReceiverIds());

        // 1. 获取消息模板指定的渠道
        List<String> candidateChannels = Lists.newArrayList();
        if (StringUtils.hasText(message.getChannels())) {
            candidateChannels = Arrays.asList(message.getChannels().split(","));
        } else {
            // 如果没有指定渠道,默认使用站内消息
            candidateChannels.add(MessageChannelEnum.IN_APP.getCode());
        }

        // 2. 过滤掉不可用的渠道
        List<String> availableChannels = candidateChannels.stream()
                .filter(channelCode -> channelMap.containsKey(channelCode))
                .collect(Collectors.toList());

        if (CollectionUtils.isEmpty(availableChannels)) {
            log.error("没有可用的消息渠道,消息发送失败");
            return false;
        }

        // 3. 对每个接收用户,根据其偏好选择渠道发送
        if (!CollectionUtils.isEmpty(message.getReceiverIds())) {
            // 批量获取用户的渠道偏好
            Map<String, String> userChannelMap = getUserChannelPreferences(
                    message.getReceiverIds(), message.getMessageType());

            // 对每个用户单独处理
            for (String userId : message.getReceiverIds()) {
                // 获取用户偏好的渠道
                String userChannels = userChannelMap.getOrDefault(userId, String.join(",", availableChannels));
                List<String> userChannelList = Arrays.asList(userChannels.split(","));

                // 选择用户偏好的第一个可用渠道
                String selectedChannel = userChannelList.stream()
                        .filter(availableChannels::contains)
                        .findFirst()
                        .orElse(availableChannels.get(0));

                // 发送消息
                MessageChannel channel = getChannelByCode(selectedChannel);
                if (ObjectUtils.isEmpty(channel)) {
                    log.warn("消息渠道不存在,渠道编码:{}", selectedChannel);
                    continue;
                }

                // 创建用户专属的消息对象
                MessageSendVO userMessage = new MessageSendVO();
                BeanUtils.copyProperties(message, userMessage);
                userMessage.setReceiverIds(Lists.newArrayList(userId));

                // 发送消息
                boolean success = channel.send(userMessage);
                if (!success) {
                    log.error("用户消息发送失败,用户ID:{},渠道:{}", userId, selectedChannel);
                }
            }
        } else if (message.getIsGlobal()) {
            // 全局消息,使用默认渠道发送
            for (String channelCode : availableChannels) {
                MessageChannel channel = getChannelByCode(channelCode);
                if (ObjectUtils.isEmpty(channel)) {
                    log.warn("消息渠道不存在,渠道编码:{}", channelCode);
                    continue;
                }

                boolean success = channel.send(message);
                if (!success) {
                    log.error("全局消息发送失败,渠道:{}", channelCode);
                }
            }
        }

        return true;
    }

    /**
     * 批量获取用户的渠道偏好
     *
     * @param userIds 用户ID列表
     * @param messageType 消息类型
     * @return 用户ID到渠道列表的映射
     */
    private Map<String, String> getUserChannelPreferences(List<String> userIds, Integer messageType) {
        if (CollectionUtils.isEmpty(userIds) || ObjectUtils.isEmpty(messageType)) {
            return Maps.newHashMap();
        }

        try {
            // 查询用户的渠道偏好
            LambdaQueryWrapper<SysUserChannelPreference> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.in(SysUserChannelPreference::getUserId, userIds)
                    .eq(SysUserChannelPreference::getMessageType, messageType)
                    .eq(SysUserChannelPreference::getIsEnabled, 1);

            List<SysUserChannelPreference> preferences = userChannelPreferenceMapper.selectList(queryWrapper);
            if (CollectionUtils.isEmpty(preferences)) {
                return Maps.newHashMap();
            }

            // 转换为Map
            return preferences.stream()
                    .collect(Collectors.toMap(
                            SysUserChannelPreference::getUserId,
                            SysUserChannelPreference::getChannels,
                            (existing, replacement) -> existing // 处理重复用户ID的情况
                    ));
        } catch (Exception e) {
            log.error("获取用户渠道偏好失败", e);
            return Maps.newHashMap();
        }
    }
}
代码语言:javascript
复制

3.5.4 消息核心服务实现
代码语言:javascript
复制
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.message.entity.SysMessageTemplate;
import com.example.message.entity.SysUserMessage;
import com.example.message.mapper.SysMessageTemplateMapper;
import com.example.message.mapper.SysUserMessageMapper;
import com.example.message.service.MessageChannelService;
import com.example.message.service.MessageService;
import com.example.message.vo.MessagePageVO;
import com.example.message.vo.MessageSendVO;
import com.example.message.vo.MessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 消息服务实现
 *
 * @author ken
 */
@Slf4j
@Service
public class MessageServiceImpl implements MessageService {

    private final MessageChannelService channelService;
    private final SysMessageTemplateMapper templateMapper;
    private final SysUserMessageMapper userMessageMapper;
    private final RedisTemplate<String, Object> redisTemplate;

    /**
     * 未读消息数量缓存键前缀
     */
    private static final String UNREAD_COUNT_PREFIX = "message:unread:count:";

    public MessageServiceImpl(MessageChannelService channelService,
                            SysMessageTemplateMapper templateMapper,
                            SysUserMessageMapper userMessageMapper,
                            RedisTemplate<String, Object> redisTemplate) {
        this.channelService = channelService;
        this.templateMapper = templateMapper;
        this.userMessageMapper = userMessageMapper;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean sendMessage(MessageSendVO message) {
        log.info("发送消息,标题:{},接收用户数量:{}",
                message.getTitle(), CollectionUtils.size(message.getReceiverIds()));

        // 参数校验
        StringUtils.hasText(message.getTitle(), "消息标题不能为空");
        StringUtils.hasText(message.getContent(), "消息内容不能为空");
        ObjectUtils.isEmpty(message.getMessageType(), "消息类型不能为空");

        // 如果指定了模板ID,则使用模板内容覆盖
        if (!ObjectUtils.isEmpty(message.getTemplateId())) {
            SysMessageTemplate template = templateMapper.selectById(message.getTemplateId());
            if (!ObjectUtils.isEmpty(template)) {
                message.setTitle(template.getTitleTemplate());
                message.setContent(template.getContentTemplate());
                message.setChannels(template.getChannels());
                if (ObjectUtils.isEmpty(message.getMessageType())) {
                    message.setMessageType(template.getMessageType());
                }
            }
        }

        // 替换消息内容中的变量
        if (!CollectionUtils.isEmpty(message.getVariables())) {
            String processedTitle = replaceVariables(message.getTitle(), message.getVariables());
            String processedContent = replaceVariables(message.getContent(), message.getVariables());
            message.setTitle(processedTitle);
            message.setContent(processedContent);
        }

        // 通过渠道服务发送消息
        return channelService.sendMessage(message);
    }

    @Override
    public boolean sendMessageByTemplate(String templateCode, MessageSendVO message) {
        log.info("通过模板发送消息,模板编码:{},接收用户数量:{}",
                templateCode, CollectionUtils.size(message.getReceiverIds()));

        // 参数校验
        StringUtils.hasText(templateCode, "模板编码不能为空");

        // 查询模板
        LambdaQueryWrapper<SysMessageTemplate> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SysMessageTemplate::getTemplateCode, templateCode)
                .eq(SysMessageTemplate::getIsEnabled, 1);
        SysMessageTemplate template = templateMapper.selectOne(queryWrapper);

        if (ObjectUtils.isEmpty(template)) {
            log.error("消息模板不存在或已禁用,模板编码:{}", templateCode);
            throw new IllegalArgumentException("消息模板不存在或已禁用");
        }

        // 设置模板信息
        message.setTemplateId(template.getId());
        message.setMessageType(template.getMessageType());
        message.setChannels(template.getChannels());

        // 发送消息
        return sendMessage(message);
    }

    @Override
    public IPage<MessageVO> getUserMessagePage(MessagePageVO pageVO) {
        log.info("查询用户消息列表,用户ID:{},页码:{},每页大小:{}",
                pageVO.getUserId(), pageVO.getPageNum(), pageVO.getPageSize());

        // 参数校验
        StringUtils.hasText(pageVO.getUserId(), "用户ID不能为空");
        if (pageVO.getPageNum() <= 0) {
            pageVO.setPageNum(1);
        }
        if (pageVO.getPageSize() <= 0 || pageVO.getPageSize() > 100) {
            pageVO.setPageSize(20);
        }

        // 构建分页参数
        Page<Map<String, Object>> page = new Page<>(pageVO.getPageNum(), pageVO.getPageSize());
        // 构建查询条件
        LambdaQueryWrapper<SysUserMessage> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SysUserMessage::getUserId, pageVO.getUserId())
                .eq(SysUserMessage::getStatus, 1);

        // 消息类型过滤
        if (!ObjectUtils.isEmpty(pageVO.getMessageType())) {
            queryWrapper.eq(SysUserMessage::getMessageType, pageVO.getMessageType());
        }

        // 已读/未读过滤
        if (!ObjectUtils.isEmpty(pageVO.getIsRead())) {
            queryWrapper.eq(SysUserMessage::getIsRead, pageVO.getIsRead());
        }

        // 时间范围过滤
        if (!ObjectUtils.isEmpty(pageVO.getStartTime())) {
            queryWrapper.ge(SysUserMessage::getReceiveTime, pageVO.getStartTime());
        }
        if (!ObjectUtils.isEmpty(pageVO.getEndTime())) {
            queryWrapper.le(SysUserMessage::getReceiveTime, pageVO.getEndTime());
        }

        // 执行查询
        IPage<Map<String, Object>> resultPage = userMessageMapper.selectUserMessagePage(page, queryWrapper);
        // 转换结果
        return resultPage.convert(this::convertToMessageVO);
    }

    @Override
    public Long getUnreadCount(String userId) {
        log.info("查询用户未读消息数量,用户ID:{}", userId);

        // 参数校验
        StringUtils.hasText(userId, "用户ID不能为空");

        try {
            // 先从缓存获取
            String cacheKey = UNREAD_COUNT_PREFIX + userId;
            Object cacheValue = redisTemplate.opsForValue().get(cacheKey);
            if (!ObjectUtils.isEmpty(cacheValue)) {
                return Long.parseLong(cacheValue.toString());
            }

            // 缓存未命中,从数据库查询
            Long unreadCount = userMessageMapper.countUnreadByUserId(userId);
            if (ObjectUtils.isEmpty(unreadCount)) {
                unreadCount = 0L;
            }

            // 存入缓存,设置1小时过期
            redisTemplate.opsForValue().set(cacheKey, unreadCount, 1, TimeUnit.HOURS);

            return unreadCount;
        } catch (Exception e) {
            log.error("查询用户未读消息数量失败,用户ID:{}", userId, e);
            // 异常情况下直接查询数据库
            Long unreadCount = userMessageMapper.countUnreadByUserId(userId);
            return ObjectUtils.isEmpty(unreadCount) ? 0L : unreadCount;
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean markAsRead(String userId, List<Long> messageIds) {
        log.info("标记消息为已读,用户ID:{},消息数量:{}", userId, CollectionUtils.size(messageIds));

        // 参数校验
        StringUtils.hasText(userId, "用户ID不能为空");
        if (CollectionUtils.isEmpty(messageIds)) {
            log.warn("消息ID列表为空,无需标记已读");
            return true;
        }

        try {
            // 批量更新已读状态
            int updateCount = userMessageMapper.batchUpdateReadStatus(userId, messageIds);
            log.info("标记消息为已读完成,用户ID:{},更新数量:{}", userId, updateCount);

            // 清除未读计数缓存
            clearUnreadCountCache(userId);

            return updateCount > 0;
        } catch (Exception e) {
            log.error("标记消息为已读失败,用户ID:{}", userId, e);
            throw new RuntimeException("标记消息为已读失败", e);
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean markAllAsRead(String userId) {
        log.info("标记所有消息为已读,用户ID:{}", userId);

        // 参数校验
        StringUtils.hasText(userId, "用户ID不能为空");

        try {
            // 查询所有未读消息
            LambdaQueryWrapper<SysUserMessage> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(SysUserMessage::getUserId, userId)
                    .eq(SysUserMessage::getIsRead, 0)
                    .eq(SysUserMessage::getStatus, 1);

            List<SysUserMessage> unreadMessages = userMessageMapper.selectList(queryWrapper);
            if (CollectionUtils.isEmpty(unreadMessages)) {
                log.info("用户没有未读消息,无需标记已读,用户ID:{}", userId);
                return true;
            }

            // 提取消息ID
            List<Long> messageIds = unreadMessages.stream()
                    .map(SysUserMessage::getMessageId)
                    .collect(Collectors.toList());

            // 批量更新已读状态
            int updateCount = userMessageMapper.batchUpdateReadStatus(userId, messageIds);
            log.info("标记所有消息为已读完成,用户ID:{},更新数量:{}", userId, updateCount);

            // 清除未读计数缓存
            clearUnreadCountCache(userId);

            return updateCount > 0;
        } catch (Exception e) {
            log.error("标记所有消息为已读失败,用户ID:{}", userId, e);
            throw new RuntimeException("标记所有消息为已读失败", e);
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean deleteMessage(String userId, List<Long> messageIds) {
        log.info("删除消息,用户ID:{},消息数量:{}", userId, CollectionUtils.size(messageIds));

        // 参数校验
        StringUtils.hasText(userId, "用户ID不能为空");
        if (CollectionUtils.isEmpty(messageIds)) {
            log.warn("消息ID列表为空,无需删除");
            return true;
        }

        try {
            // 逻辑删除消息(更新状态为0)
            LambdaUpdateWrapper<SysUserMessage> updateWrapper = new LambdaUpdateWrapper<>();
            updateWrapper.eq(SysUserMessage::getUserId, userId)
                    .in(SysUserMessage::getMessageId, messageIds)
                    .set(SysUserMessage::getStatus, 0);

            int deleteCount = userMessageMapper.update(null, updateWrapper);
            log.info("删除消息完成,用户ID:{},删除数量:{}", userId, deleteCount);

            // 清除未读计数缓存
            clearUnreadCountCache(userId);

            return deleteCount > 0;
        } catch (Exception e) {
            log.error("删除消息失败,用户ID:{}", userId, e);
            throw new RuntimeException("删除消息失败", e);
        }
    }

    /**
     * 清除用户未读消息数量缓存
     *
     * @param userId 用户ID
     */
    public void clearUnreadCountCache(String userId) {
        if (StringUtils.hasText(userId)) {
            String cacheKey = UNREAD_COUNT_PREFIX + userId;
            redisTemplate.delete(cacheKey);
            log.info("清除用户未读消息数量缓存,用户ID:{},缓存键:{}", userId, cacheKey);
        }
    }

    /**
     * 替换消息内容中的变量
     *
     * @param content 原始内容
     * @param variables 变量映射
     * @return 替换后的内容
     */
    private String replaceVariables(String content, Map<String, Object> variables) {
        if (StringUtils.isEmpty(content) || CollectionUtils.isEmpty(variables)) {
            return content;
        }

        String processedContent = content;
        for (Map.Entry<String, Object> entry : variables.entrySet()) {
            String placeholder = "${" + entry.getKey() + "}";
            String value = entry.getValue() != null ? entry.getValue().toString() : "";
            processedContent = processedContent.replace(placeholder, value);
        }

        return processedContent;
    }

    /**
     * 将Map转换为MessageVO
     *
     * @param map 数据库查询结果Map
     * @return MessageVO对象
     */
    private MessageVO convertToMessageVO(Map<String, Object> map) {
        if (CollectionUtils.isEmpty(map)) {
            return null;
        }

        MessageVO messageVO = new MessageVO();
        messageVO.setMessageId(ObjectUtils.isEmpty(map.get("message_id")) ? null :
                Long.parseLong(map.get("message_id").toString()));
        messageVO.setUserId(map.get("user_id") != null ? map.get("user_id").toString() : null);
        messageVO.setTitle(map.get("title") != null ? map.get("title").toString() : null);
        messageVO.setContent(map.get("content") != null ? map.get("content").toString() : null);
        messageVO.setMessageType(ObjectUtils.isEmpty(map.get("message_type")) ? null :
                Integer.parseInt(map.get("message_type").toString()));
        messageVO.setSenderId(map.get("sender_id") != null ? map.get("sender_id").toString() : null);
        messageVO.setSenderName(map.get("sender_name") != null ? map.get("sender_name").toString() : null);
        messageVO.setIsRead(ObjectUtils.isEmpty(map.get("is_read")) ? null :
                Integer.parseInt(map.get("is_read").toString()));
        messageVO.setReadTime(map.get("read_time") != null ? (LocalDateTime) map.get("read_time") : null);
        messageVO.setReceiveTime(map.get("receive_time") != null ? (LocalDateTime) map.get("receive_time") : null);
        messageVO.setCreatedTime(map.get("message_created_time") != null ? (LocalDateTime) map.get("message_created_time") : null);

        return messageVO;
    }
}
代码语言:javascript
复制

3.6 WebSocket 实时通知实现

为了实现前端实时显示新消息的功能,我们使用 WebSocket 技术:

3.6.1 WebSocket 配置
代码语言:javascript
复制
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;

/**
 * WebSocket配置
 *
 * @author ken
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    private final MessageWebSocketHandler messageWebSocketHandler;
    private final HandshakeInterceptor handshakeInterceptor;

    public WebSocketConfig(MessageWebSocketHandler messageWebSocketHandler,
                         HandshakeInterceptor handshakeInterceptor) {
        this.messageWebSocketHandler = messageWebSocketHandler;
        this.handshakeInterceptor = handshakeInterceptor;
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 注册WebSocket处理器,允许所有来源的连接
        registry.addHandler(messageWebSocketHandler, "/ws/message")
                .addInterceptors(handshakeInterceptor)
                .setAllowedOrigins("*");
    }

    /**
     * 握手拦截器,用于验证用户身份
     *
     * @return 握手拦截器
     */
    @Bean
    public HandshakeInterceptor handshakeInterceptor() {
        return new MessageHandshakeInterceptor();
    }
}
代码语言:javascript
复制

3.6.2 握手拦截器
代码语言:javascript
复制
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpSession;
import java.util.Map;

/**
 * 消息WebSocket握手拦截器
 *
 * @author ken
 */
public class MessageHandshakeInterceptor implements HandshakeInterceptor {

    /**
     * 握手前的处理,用于验证用户身份
     *
     * @param request 请求对象
     * @param response 响应对象
     * @param wsHandler WebSocket处理器
     * @param attributes  attributes
     * @return 是否允许握手
     * @throws Exception 异常
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpSession session = servletRequest.getServletRequest().getSession(false);

            // 从请求参数中获取用户ID,实际应用中应该从登录信息中获取
            String userId = servletRequest.getServletRequest().getParameter("userId");

            if (userId != null) {
                // 将用户ID存储在属性中,供后续使用
                attributes.put("userId", userId);
                return true;
            }
        }
        return false;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                              WebSocketHandler wsHandler, Exception exception) {
        // 握手后的处理,无需实现
    }
}
代码语言:javascript
复制

3.6.3 WebSocket 处理器
代码语言:javascript
复制
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 消息WebSocket处理器
 *
 * @author ken
 */
@Slf4j
@Component
public class MessageWebSocketHandler extends TextWebSocketHandler {

    /**
     * 存储用户ID与WebSocketSession的映射
     */
    private final Map<String, WebSocketSession> userSessionMap = new ConcurrentHashMap<>();

    /**
     * 连接建立后调用
     *
     * @param session WebSocket会话
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 从会话属性中获取用户ID
        String userId = (String) session.getAttributes().get("userId");
        if (userId != null) {
            userSessionMap.put(userId, session);
            log.info("WebSocket连接建立,用户ID:{},当前在线用户数:{}", userId, userSessionMap.size());
        }
    }

    /**
     * 处理接收到的消息
     *
     * @param session WebSocket会话
     * @param message 接收到的消息
     * @throws Exception 异常
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        log.info("收到用户消息,用户ID:{},消息内容:{}", userId, message.getPayload());

        // 这里可以处理客户端发送的消息,例如确认收到消息等
        // 简单回复一个确认消息
        session.sendMessage(new TextMessage("服务器已收到消息:" + message.getPayload()));
    }

    /**
     * 连接关闭后调用
     *
     * @param session WebSocket会话
     * @param status 关闭状态
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        String userId = (String) session.getAttributes().get("userId");
        if (userId != null) {
            userSessionMap.remove(userId);
            log.info("WebSocket连接关闭,用户ID:{},当前在线用户数:{}", userId, userSessionMap.size());
        }
    }

    /**
     * 向指定用户发送消息
     *
     * @param userId 用户ID
     * @param message 消息内容
     * @return 是否发送成功
     */
    public boolean sendMessageToUser(String userId, Object message) {
        if (userId == null || message == null) {
            return false;
        }

        WebSocketSession session = userSessionMap.get(userId);
        if (session == null || !session.isOpen()) {
            log.warn("用户WebSocket连接不存在或已关闭,用户ID:{}", userId);
            return false;
        }

        try {
            String jsonMessage = JSON.toJSONString(message);
            session.sendMessage(new TextMessage(jsonMessage));
            log.info("向用户发送WebSocket消息成功,用户ID:{},消息内容:{}", userId, jsonMessage);
            return true;
        } catch (IOException e) {
            log.error("向用户发送WebSocket消息失败,用户ID:{}", userId, e);
            return false;
        }
    }

    /**
     * 向多个用户发送消息
     *
     * @param userIds 用户ID列表
     * @param message 消息内容
     */
    public void sendMessageToUsers(Set<String> userIds, Object message) {
        if (userIds == null || userIds.isEmpty() || message == null) {
            return;
        }

        String jsonMessage = JSON.toJSONString(message);
        TextMessage textMessage = new TextMessage(jsonMessage);

        for (String userId : userIds) {
            WebSocketSession session = userSessionMap.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    session.sendMessage(textMessage);
                    log.info("向用户发送WebSocket消息成功,用户ID:{}", userId);
                } catch (IOException e) {
                    log.error("向用户发送WebSocket消息失败,用户ID:{}", userId, e);
                }
            } else {
                log.warn("用户WebSocket连接不存在或已关闭,用户ID:{}", userId);
            }
        }
    }

    /**
     * 获取在线用户ID列表
     *
     * @return 在线用户ID列表
     */
    public Set<String> getOnlineUserIds() {
        return userSessionMap.keySet();
    }
}
代码语言:javascript
复制

3.6.4 通知服务
代码语言:javascript
复制
import com.example.message.entity.SysMessage;
import com.example.message.mapper.SysMessageMapper;
import com.example.message.vo.NotificationVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * 通知服务,用于向客户端推送实时消息
 *
 * @author ken
 */
@Slf4j
@Service
public class NotifyService {

    private final MessageWebSocketHandler webSocketHandler;
    private final SysMessageMapper messageMapper;
    private final MessageService messageService;

    public NotifyService(MessageWebSocketHandler webSocketHandler,
                       SysMessageMapper messageMapper,
                       MessageService messageService) {
        this.webSocketHandler = webSocketHandler;
        this.messageMapper = messageMapper;
        this.messageService = messageService;
    }

    /**
     * 通知用户有新消息
     *
     * @param messageId 消息ID
     * @param userIds 用户ID列表
     */
    public void notifyNewMessage(Long messageId, List<String> userIds) {
        log.info("通知用户有新消息,消息ID:{},用户数量:{}", messageId, userIds.size());

        if (ObjectUtils.isEmpty(messageId) || userIds == null || userIds.isEmpty()) {
            log.warn("消息ID或用户列表为空,无法发送通知");
            return;
        }

        try {
            // 查询消息详情
            SysMessage message = messageMapper.selectById(messageId);
            if (ObjectUtils.isEmpty(message)) {
                log.error("消息不存在,无法发送通知,消息ID:{}", messageId);
                return;
            }

            // 构建通知内容
            NotificationVO notification = new NotificationVO();
            notification.setType("NEW_MESSAGE");
            notification.setMessageId(messageId);
            notification.setTitle(message.getTitle());
            notification.setContent(message.getContent());
            notification.setSendTime(message.getCreatedTime());

            // 转换为Set,避免重复
            Set<String> userIdSet = userIds.stream().collect(Collectors.toSet());

            // 向用户发送WebSocket通知
            webSocketHandler.sendMessageToUsers(userIdSet, notification);

            // 清除用户未读计数缓存
            for (String userId : userIdSet) {
                messageService.clearUnreadCountCache(userId);
            }
        } catch (Exception e) {
            log.error("通知用户新消息失败,消息ID:{}", messageId, e);
        }
    }
}
代码语言:javascript
复制

3.7 控制器实现

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.example.message.service.MessageService;
import com.example.message.vo.MessagePageVO;
import com.example.message.vo.MessageSendVO;
import com.example.message.vo.MessageVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;

/**
 * 消息控制器
 *
 * @author ken
 */
@Slf4j
@RestController
@RequestMapping("/api/message")
@Tag(name = "消息接口", description = "提供消息发送、查询、已读等功能")
public class MessageController {

    private final MessageService messageService;

    public MessageController(MessageService messageService) {
        this.messageService = messageService;
    }

    @PostMapping("/send")
    @Operation(summary = "发送消息", description = "直接发送一条消息给指定用户")
    public Map<String, Object> sendMessage(@RequestBody MessageSendVO message) {
        boolean success = messageService.sendMessage(message);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", success);
        result.put("message", success ? "消息发送成功" : "消息发送失败");
        return result;
    }

    @PostMapping("/sendByTemplate")
    @Operation(summary = "通过模板发送消息", description = "使用指定的消息模板发送消息")
    public Map<String, Object> sendMessageByTemplate(
            @Parameter(description = "模板编码", required = true) @RequestParam String templateCode,
            @RequestBody MessageSendVO message) {
        boolean success = messageService.sendMessageByTemplate(templateCode, message);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", success);
        result.put("message", success ? "消息发送成功" : "消息发送失败");
        return result;
    }

    @GetMapping("/list")
    @Operation(summary = "查询用户消息列表", description = "分页查询用户的消息列表,支持筛选条件")
    public Map<String, Object> getUserMessageList(MessagePageVO pageVO) {
        IPage<MessageVO> messagePage = messageService.getUserMessagePage(pageVO);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", true);
        result.put("data", messagePage.getRecords());
        result.put("total", messagePage.getTotal());
        result.put("pages", messagePage.getPages());
        result.put("current", messagePage.getCurrent());
        result.put("size", messagePage.getSize());
        return result;
    }

    @GetMapping("/unread/count")
    @Operation(summary = "查询未读消息数量", description = "查询指定用户的未读消息总数")
    public Map<String, Object> getUnreadCount(
            @Parameter(description = "用户ID", required = true) @RequestParam String userId) {
        Long unreadCount = messageService.getUnreadCount(userId);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", true);
        result.put("data", unreadCount);
        return result;
    }

    @PostMapping("/read")
    @Operation(summary = "标记消息为已读", description = "将指定的消息标记为已读状态")
    public Map<String, Object> markAsRead(
            @Parameter(description = "用户ID", required = true) @RequestParam String userId,
            @Parameter(description = "消息ID列表", required = true) @RequestBody List<Long> messageIds) {
        boolean success = messageService.markAsRead(userId, messageIds);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", success);
        result.put("message", success ? "标记已读成功" : "标记已读失败");
        return result;
    }

    @PostMapping("/read/all")
    @Operation(summary = "标记所有消息为已读", description = "将用户的所有未读消息标记为已读状态")
    public Map<String, Object> markAllAsRead(
            @Parameter(description = "用户ID", required = true) @RequestParam String userId) {
        boolean success = messageService.markAllAsRead(userId);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", success);
        result.put("message", success ? "标记所有已读成功" : "标记所有已读失败");
        return result;
    }

    @PostMapping("/delete")
    @Operation(summary = "删除消息", description = "删除用户的指定消息")
    public Map<String, Object> deleteMessage(
            @Parameter(description = "用户ID", required = true) @RequestParam String userId,
            @Parameter(description = "消息ID列表", required = true) @RequestBody List<Long> messageIds) {
        boolean success = messageService.deleteMessage(userId, messageIds);

        Map<String, Object> result = Maps.newHashMap();
        result.put("success", success);
        result.put("message", success ? "删除消息成功" : "删除消息失败");
        return result;
    }
}
代码语言:javascript
复制

3.8 前端 WebSocket 连接示例

为了完整展示实时通知功能,我们提供一个简单的前端 WebSocket 连接示例:

代码语言:javascript
复制
// 连接WebSocket
function connectWebSocket(userId) {
    // 判断浏览器是否支持WebSocket
    if (!window.WebSocket) {
        console.error('您的浏览器不支持WebSocket,无法接收实时消息');
        return null;
    }

    // 构建WebSocket连接URL
    const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
    const host = window.location.host;
    const url = `${protocol}//${host}/ws/message?userId=${userId}`;

    // 创建WebSocket连接
    const ws = new WebSocket(url);

    // 连接成功回调
    ws.onopen = function() {
        console.log('WebSocket连接成功');
    };

    // 接收消息回调
    ws.onmessage = function(event) {
        console.log('收到新消息:', event.data);
        const message = JSON.parse(event.data);

        // 处理新消息通知
        if (message.type === 'NEW_MESSAGE') {
            handleNewMessage(message);
        }
    };

    // 连接关闭回调
    ws.onclose = function() {
        console.log('WebSocket连接关闭,尝试重新连接...');
        // 5秒后尝试重新连接
        setTimeout(() => connectWebSocket(userId), 5000);
    };

    // 连接错误回调
    ws.onerror = function(error) {
        console.error('WebSocket错误:', error);
    };

    return ws;
}

// 处理新消息
function handleNewMessage(message) {
    // 显示消息通知
    showNotification(`新消息: ${message.title}`, message.content);

    // 更新未读消息数量
    updateUnreadCount();

    // 如果当前在消息列表页面,可以刷新列表
    if (isOnMessageListPage()) {
        refreshMessageList();
    }
}

// 显示消息通知
function showNotification(title, content) {
    // 简单实现,实际项目中可以使用更美观的通知组件
    const notification = document.createElement('div');
    notification.className = 'message-notification';
    notification.innerHTML = `
        <div class="notification-title">${title}</div>
        <div class="notification-content">${content}</div>
    `;
    document.body.appendChild(notification);

    // 3秒后自动关闭通知
    setTimeout(() => {
        notification.style.opacity = '0';
        setTimeout(() => notification.remove(), 500);
    }, 3000);
}

// 更新未读消息数量
function updateUnreadCount() {
    // 调用后端接口获取最新的未读消息数量
    fetch('/api/message/unread/count?userId=' + currentUserId)
        .then(response => response.json())
        .then(data => {
            if (data.success) {
                const countElement = document.getElementById('unread-count');
                if (countElement) {
                    countElement.textContent = data.data;
                    // 如果有未读消息,显示小红点
                    if (data.data > 0) {
                        countElement.classList.add('has-unread');
                    } else {
                        countElement.classList.remove('has-unread');
                    }
                }
            }
        });
}
代码语言:javascript
复制

四、扩展功能实现

4.1 短信渠道实现

为了支持短信发送功能,我们实现一个短信渠道:

代码语言:javascript
复制
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysMessageChannel;
import com.example.message.mapper.SysMessageChannelMapper;
import com.example.message.vo.MessageSendVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Map;

/**
 * 短信消息渠道实现
 *
 * @author ken
 */
@Slf4j
@Component
public class SmsMessageChannel implements MessageChannel {

    private final SysMessageChannelMapper channelMapper;
    private final SmsService smsService;

    public SmsMessageChannel(SysMessageChannelMapper channelMapper, SmsService smsService) {
        this.channelMapper = channelMapper;
        this.smsService = smsService;
    }

    @Override
    public String getChannelCode() {
        return "SMS";
    }

    @Override
    public boolean send(MessageSendVO message) {
        log.info("通过短信渠道发送消息,接收用户:{},消息标题:{}",
                message.getReceiverIds(), message.getTitle());

        try {
            // 参数校验
            if (ObjectUtils.isEmpty(message.getReceiverPhones()) || message.getReceiverPhones().isEmpty()) {
                log.error("短信接收手机号不能为空");
                return false;
            }

            // 获取短信渠道配置
            SysMessageChannel channelConfig = getChannelConfig();
            if (ObjectUtils.isEmpty(channelConfig) || !"1".equals(channelConfig.getIsEnabled())) {
                log.error("短信渠道未配置或已禁用");
                return false;
            }

            // 解析配置
            Map<String, String> configMap = parseConfig(channelConfig.getConfig());

            // 发送短信
            for (String phone : message.getReceiverPhones()) {
                // 短信内容通常有长度限制,这里做简单处理
                String smsContent = StringUtils.hasText(message.getTitle()) ? 
                        message.getTitle() + ":" + message.getContent() : message.getContent();

                // 截取过长的内容
                if (smsContent.length() > 500) {
                    smsContent = smsContent.substring(0, 500) + "...";
                }

                // 调用短信服务发送
                boolean success = smsService.sendSms(
                        configMap.get("templateId"), 
                        phone, 
                        message.getVariables());

                if (success) {
                    log.info("短信发送成功,手机号:{}", phone);
                } else {
                    log.error("短信发送失败,手机号:{}", phone);
                }
            }

            return true;
        } catch (Exception e) {
            log.error("短信发送失败", e);
            return false;
        }
    }

    /**
     * 获取渠道配置
     *
     * @return 渠道配置
     */
    private SysMessageChannel getChannelConfig() {
        return channelMapper.selectOne(
                new LambdaQueryWrapper<SysMessageChannel>()
                        .eq(SysMessageChannel::getChannelCode, getChannelCode())
                        .eq(SysMessageChannel::getIsEnabled, 1)
        );
    }

    /**
     * 解析配置JSON
     *
     * @param configJson 配置JSON字符串
     * @return 配置映射
     */
    private Map<String, String> parseConfig(String configJson) {
        if (StringUtils.isEmpty(configJson)) {
            return Maps.newHashMap();
        }

        try {
            return JSON.parseObject(configJson, Map.class);
        } catch (Exception e) {
            log.error("解析渠道配置失败,配置内容:{}", configJson, e);
            return Maps.newHashMap();
        }
    }
}
代码语言:javascript
复制

4.2 邮件渠道实现

代码语言:javascript
复制
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysMessageChannel;
import com.example.message.mapper.SysMessageChannelMapper;
import com.example.message.vo.MessageSendVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.Map;

/**
 * 邮件消息渠道实现
 *
 * @author ken
 */
@Slf4j
@Component
public class EmailMessageChannel implements MessageChannel {

    private final SysMessageChannelMapper channelMapper;
    private final JavaMailSender mailSender;

    public EmailMessageChannel(SysMessageChannelMapper channelMapper, JavaMailSender mailSender) {
        this.channelMapper = channelMapper;
        this.mailSender = mailSender;
    }

    @Override
    public String getChannelCode() {
        return "EMAIL";
    }

    @Override
    public boolean send(MessageSendVO message) {
        log.info("通过邮件渠道发送消息,接收用户:{},消息标题:{}",
                message.getReceiverIds(), message.getTitle());

        try {
            // 参数校验
            if (ObjectUtils.isEmpty(message.getReceiverEmails()) || message.getReceiverEmails().isEmpty()) {
                log.error("邮件接收地址不能为空");
                return false;
            }
            if (StringUtils.isEmpty(message.getTitle())) {
                log.error("邮件标题不能为空");
                return false;
            }
            if (StringUtils.isEmpty(message.getContent())) {
                log.error("邮件内容不能为空");
                return false;
            }

            // 获取邮件渠道配置
            SysMessageChannel channelConfig = getChannelConfig();
            if (ObjectUtils.isEmpty(channelConfig) || !"1".equals(channelConfig.getIsEnabled())) {
                log.error("邮件渠道未配置或已禁用");
                return false;
            }

            // 解析配置
            Map<String, String> configMap = parseConfig(channelConfig.getConfig());
            String fromEmail = configMap.get("fromEmail");
            if (StringUtils.isEmpty(fromEmail)) {
                log.error("发件人邮箱未配置");
                return false;
            }

            // 创建邮件消息
            MimeMessage mimeMessage = mailSender.createMimeMessage();
            MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);
            helper.setFrom(fromEmail);
            helper.setTo(message.getReceiverEmails().toArray(new String[0]));
            helper.setSubject(message.getTitle());
            helper.setText(message.getContent(), true); // 第二个参数表示是否为HTML内容

            // 发送邮件
            mailSender.send(mimeMessage);
            log.info("邮件发送成功,收件人数量:{}", message.getReceiverEmails().size());

            return true;
        } catch (MessagingException e) {
            log.error("邮件发送失败", e);
            return false;
        } catch (Exception e) {
            log.error("邮件发送过程中发生异常", e);
            return false;
        }
    }

    /**
     * 获取渠道配置
     *
     * @return 渠道配置
     */
    private SysMessageChannel getChannelConfig() {
        return channelMapper.selectOne(
                new LambdaQueryWrapper<SysMessageChannel>()
                        .eq(SysMessageChannel::getChannelCode, getChannelCode())
                        .eq(SysMessageChannel::getIsEnabled, 1)
        );
    }

    /**
     * 解析配置JSON
     *
     * @param configJson 配置JSON字符串
     * @return 配置映射
     */
    private Map<String, String> parseConfig(String configJson) {
        if (StringUtils.isEmpty(configJson)) {
            return Maps.newHashMap();
        }

        try {
            return JSON.parseObject(configJson, Map.class);
        } catch (Exception e) {
            log.error("解析渠道配置失败,配置内容:{}", configJson, e);
            return Maps.newHashMap();
        }
    }
}
代码语言:javascript
复制

五、系统测试与性能优化

5.1 单元测试示例

代码语言:javascript
复制
import com.example.message.service.MessageService;
import com.example.message.vo.MessageSendVO;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * 消息服务单元测试
 *
 * @author ken
 */
@SpringBootTest
public class MessageServiceTest {

    @Autowired
    private MessageService messageService;

    @Test
    public void testSendMessage() {
        MessageSendVO message = new MessageSendVO();
        message.setTitle("测试消息");
        message.setContent("这是一条测试消息");
        message.setMessageType(1);
        message.setSenderId("system");
        message.setSenderName("系统");
        message.setReceiverIds(Arrays.asList("user1", "user2", "user3"));

        boolean success = messageService.sendMessage(message);
        assertTrue(success, "消息发送失败");
    }

    @Test
    public void testSendMessageByTemplate() {
        MessageSendVO message = new MessageSendVO();
        message.setSenderId("system");
        message.setSenderName("系统");
        message.setReceiverIds(Arrays.asList("user1", "user2"));

        // 设置消息变量
        Map<String, Object> variables = new HashMap<>();
        variables.put("username", "测试用户");
        variables.put("orderId", "ORDER123456");
        variables.put("amount", "99.99");
        message.setVariables(variables);

        boolean success = messageService.sendMessageByTemplate("ORDER_NOTIFY", message);
        assertTrue(success, "通过模板发送消息失败");
    }
}
代码语言:javascript
复制

5.2 性能优化策略

  1. 数据库索引优化
    • 为频繁查询的字段建立索引,如 user_id、is_read、message_type 等
    • 对于大表,可以考虑分区表,按时间或用户 ID 分区
  2. 缓存策略
    • 缓存用户未读消息数量,减少数据库查询
    • 缓存热门消息模板,避免频繁查询数据库
    • 合理设置缓存过期时间,平衡一致性和性能
  3. 异步处理
    • 消息发送采用异步处理,避免阻塞主线程
    • 使用消息队列(如 RabbitMQ、Kafka)处理消息发送,提高系统吞吐量
  4. 批量操作
    • 对于批量发送消息,采用批量插入数据库的方式
    • 批量标记已读、批量删除等操作使用批量 SQL
  5. 分库分表
    • 当消息数据量巨大时,可以考虑分库分表
    • 按用户 ID 哈希分片,或按时间范围分片
  6. WebSocket 连接优化
    • 使用连接池管理 WebSocket 连接
    • 实现心跳检测,及时清理无效连接
    • 考虑使用 WebSocket 集群,支持会话共享

六、总结与展望

本文详细介绍了如何设计和实现一套功能完善、可扩展的站内消息系统。从需求分析到架构设计,从数据库表结构到核心代码实现,我们全面覆盖了站内消息系统的各个方面。

系统的核心特点包括:

  1. 完善的消息状态管理:实现了已读 / 未读状态跟踪,支持批量标记已读和删除操作。
  2. 良好的扩展性:通过抽象消息渠道接口,为未来扩展短信、邮件、AI 电话等通知方式奠定了基础。
  3. 实时性:利用 WebSocket 技术实现了新消息的实时推送,提升了用户体验。
  4. 高性能:通过缓存、批量操作等手段,确保系统在高并发场景下的稳定运行。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-09-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言:为什么需要一套优秀的站内消息系统?
  • 一、需求分析与系统设计
    • 1.1 核心需求梳理
    • 1.2 系统架构设计
    • 1.3 消息生命周期
  • 二、数据库设计
    • 2.1 表结构设计
      • 2.1.1 消息表(sys_message)
      • 2.1.2 用户消息关联表(sys_user_message)
      • 2.1.3 消息渠道配置表(sys_message_channel)
      • 2.1.4 消息模板表(sys_message_template)
      • 2.1.5 用户渠道偏好设置表(sys_user_channel_preference)
    • 2.2 设计说明
  • 三、核心功能实现
    • 3.1 项目依赖配置
    • 3.2 实体类定义
      • 3.2.1 消息实体(SysMessage)
      • 3.2.2 用户消息关联实体(SysUserMessage)
      • 3.2.3 消息渠道配置实体(SysMessageChannel)
      • 3.2.4 消息模板实体(SysMessageTemplate)
      • 3.2.5 用户渠道偏好设置实体(SysUserChannelPreference)
    • 3.3 数据访问层(DAO)
      • 3.3.1 消息 Mapper(SysMessageMapper)
      • 3.3.2 用户消息关联 Mapper(SysUserMessageMapper)
    • 3.4 枚举类定义
      • 3.4.1 消息类型枚举(MessageTypeEnum)
      • 3.4.2 消息渠道枚举(MessageChannelEnum)
    • 3.5 核心服务实现
      • 3.5.1 消息渠道接口定义
      • 3.5.2 站内消息渠道实现
      • 3.5.3 渠道服务实现
      • 3.5.4 消息核心服务实现
    • 3.6 WebSocket 实时通知实现
      • 3.6.1 WebSocket 配置
      • 3.6.2 握手拦截器
      • 3.6.3 WebSocket 处理器
      • 3.6.4 通知服务
    • 3.7 控制器实现
    • 3.8 前端 WebSocket 连接示例
  • 四、扩展功能实现
    • 4.1 短信渠道实现
    • 4.2 邮件渠道实现
  • 五、系统测试与性能优化
    • 5.1 单元测试示例
    • 5.2 性能优化策略
  • 六、总结与展望
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档