
在当今的 Web 和移动应用中,站内消息系统已经成为连接用户与平台的重要桥梁。想象一下这些场景:当你在电商平台下单后,会收到订单状态变更的通知;在社交应用中,有人给你发消息或点赞时,你会收到提醒;在企业系统中,审批流程的每一步进展都需要及时通知相关人员。
一个设计良好的站内消息系统能够:
然而,看似简单的消息功能背后,却隐藏着不少技术挑战:如何高效处理消息的已读 / 未读状态?如何支持未来的多渠道扩展?如何实现前端实时显示新消息?本文将从架构设计到代码实现,全面解析如何构建一套可扩展、高性能的站内消息系统。
我们需要设计的站内消息系统需满足以下核心需求:
基于上述需求,我们设计如下系统架构:

架构说明:
消息从创建到被用户阅读的完整生命周期如下:

一个健壮的数据库设计是系统可靠运行的基础。我们需要设计消息相关的表结构,既要满足当前需求,又要为未来扩展预留空间。
存储消息的基本内容,一条消息可以被发送给多个用户。
CREATE TABLE `sys_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '消息ID',
`title` varchar(255) NOT NULL COMMENT '消息标题',
`content` text NOT NULL COMMENT '消息内容',
`message_type` tinyint NOT NULL COMMENT '消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息',
`template_id` bigint DEFAULT NULL COMMENT '消息模板ID,关联sys_message_template表',
`sender_id` varchar(64) DEFAULT NULL COMMENT '发送者ID',
`sender_name` varchar(128) DEFAULT NULL COMMENT '发送者名称',
`is_global` tinyint NOT NULL DEFAULT '0' COMMENT '是否全局消息:0-否,1-是',
`status` tinyint NOT NULL DEFAULT '1' COMMENT '状态:0-已删除,1-正常',
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_message_type` (`message_type`),
KEY `idx_created_time` (`created_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统消息表';
记录用户与消息的关联关系,包括已读 / 未读状态。
CREATE TABLE `sys_user_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`message_id` bigint NOT NULL COMMENT '消息ID,关联sys_message表',
`user_id` varchar(64) NOT NULL COMMENT '用户ID',
`is_read` tinyint NOT NULL DEFAULT '0' COMMENT '是否已读:0-未读,1-已读',
`read_time` datetime DEFAULT NULL COMMENT '阅读时间',
`receive_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '接收时间',
`status` tinyint NOT NULL DEFAULT '1' COMMENT '状态:0-已删除,1-正常',
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_user` (`message_id`,`user_id`) COMMENT '确保一个用户不会收到重复消息',
KEY `idx_user_id` (`user_id`),
KEY `idx_is_read` (`is_read`),
KEY `idx_receive_time` (`receive_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户消息关联表';
配置各种消息渠道的参数。
CREATE TABLE `sys_message_channel` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`channel_code` varchar(32) NOT NULL COMMENT '渠道编码:IN_APP-站内消息,SMS-短信,EMAIL-邮件,AI_CALL-AI电话',
`channel_name` varchar(64) NOT NULL COMMENT '渠道名称',
`config` text COMMENT '渠道配置,JSON格式',
`is_enabled` tinyint NOT NULL DEFAULT '1' COMMENT '是否启用:0-禁用,1-启用',
`sort_order` int NOT NULL DEFAULT '0' COMMENT '排序序号',
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_channel_code` (`channel_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息渠道配置表';
存储消息模板,便于统一管理和维护消息格式。
CREATE TABLE `sys_message_template` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`template_code` varchar(64) NOT NULL COMMENT '模板编码',
`template_name` varchar(128) NOT NULL COMMENT '模板名称',
`message_type` tinyint NOT NULL COMMENT '消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息',
`title_template` varchar(255) NOT NULL COMMENT '标题模板,支持变量替换',
`content_template` text NOT NULL COMMENT '内容模板,支持变量替换',
`channels` varchar(128) NOT NULL COMMENT '支持的渠道,逗号分隔:IN_APP,SMS,EMAIL',
`is_enabled` tinyint NOT NULL DEFAULT '1' COMMENT '是否启用:0-禁用,1-启用',
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_template_code` (`template_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息模板表';
记录用户对各消息渠道的偏好设置。
CREATE TABLE `sys_user_channel_preference` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` varchar(64) NOT NULL COMMENT '用户ID',
`message_type` tinyint NOT NULL COMMENT '消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息',
`channels` varchar(128) NOT NULL COMMENT '首选渠道,逗号分隔:IN_APP,SMS,EMAIL',
`is_enabled` tinyint NOT NULL DEFAULT '1' COMMENT '是否启用:0-禁用,1-启用',
`created_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_type` (`user_id`,`message_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户渠道偏好设置表';
首先,我们需要配置 Maven 依赖:
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
</dependency>
<!-- Spring Boot Starter WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>3.2.0</version>
</dependency>
<!-- Spring Boot Starter Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>3.2.0</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
<scope>runtime</scope>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Spring Utils -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>6.1.1</version>
</dependency>
<!-- Google Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.1.0-jre</version>
</dependency>
<!-- FastJSON2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Hutool -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
</dependencies>
使用 MyBatis-Plus 的实体类注解,定义与数据库表对应的实体类。
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 系统消息实体类
*
* @author ken
*/
@Data
@TableName("sys_message")
public class SysMessage {
/**
* 消息ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 消息标题
*/
private String title;
/**
* 消息内容
*/
private String content;
/**
* 消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息
*/
private Integer messageType;
/**
* 消息模板ID,关联sys_message_template表
*/
private Long templateId;
/**
* 发送者ID
*/
private String senderId;
/**
* 发送者名称
*/
private String senderName;
/**
* 是否全局消息:0-否,1-是
*/
private Integer isGlobal;
/**
* 状态:0-已删除,1-正常
*/
private Integer status;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updatedTime;
}
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户消息关联实体类
*
* @author ken
*/
@Data
@TableName("sys_user_message")
public class SysUserMessage {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 消息ID,关联sys_message表
*/
private Long messageId;
/**
* 用户ID
*/
private String userId;
/**
* 是否已读:0-未读,1-已读
*/
private Integer isRead;
/**
* 阅读时间
*/
private LocalDateTime readTime;
/**
* 接收时间
*/
private LocalDateTime receiveTime;
/**
* 状态:0-已删除,1-正常
*/
private Integer status;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updatedTime;
}
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息渠道配置实体类
*
* @author ken
*/
@Data
@TableName("sys_message_channel")
public class SysMessageChannel {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 渠道编码:IN_APP-站内消息,SMS-短信,EMAIL-邮件,AI_CALL-AI电话
*/
private String channelCode;
/**
* 渠道名称
*/
private String channelName;
/**
* 渠道配置,JSON格式
*/
private String config;
/**
* 是否启用:0-禁用,1-启用
*/
private Integer isEnabled;
/**
* 排序序号
*/
private Integer sortOrder;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updatedTime;
}
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息模板实体类
*
* @author ken
*/
@Data
@TableName("sys_message_template")
public class SysMessageTemplate {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 模板编码
*/
private String templateCode;
/**
* 模板名称
*/
private String templateName;
/**
* 消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息
*/
private Integer messageType;
/**
* 标题模板,支持变量替换
*/
private String titleTemplate;
/**
* 内容模板,支持变量替换
*/
private String contentTemplate;
/**
* 支持的渠道,逗号分隔:IN_APP,SMS,EMAIL
*/
private String channels;
/**
* 是否启用:0-禁用,1-启用
*/
private Integer isEnabled;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updatedTime;
}
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 用户渠道偏好设置实体类
*
* @author ken
*/
@Data
@TableName("sys_user_channel_preference")
public class SysUserChannelPreference {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户ID
*/
private String userId;
/**
* 消息类型:1-系统通知,2-业务提醒,3-互动消息,4-营销消息
*/
private Integer messageType;
/**
* 首选渠道,逗号分隔:IN_APP,SMS,EMAIL
*/
private String channels;
/**
* 是否启用:0-禁用,1-启用
*/
private Integer isEnabled;
/**
* 创建时间
*/
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updatedTime;
}
使用 MyBatis-Plus 的 BaseMapper 实现基本的数据访问操作。
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
/**
* 消息Mapper接口
*
* @author ken
*/
@Mapper
public interface SysMessageMapper extends BaseMapper<SysMessage> {
}
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Map;
/**
* 用户消息关联Mapper接口
*
* @author ken
*/
@Mapper
public interface SysUserMessageMapper extends BaseMapper<SysUserMessage> {
/**
* 分页查询用户消息列表,关联消息内容
*
* @param page 分页参数
* @param queryWrapper 查询条件
* @return 分页结果,包含消息详情
*/
IPage<Map<String, Object>> selectUserMessagePage(
IPage<Map<String, Object>> page,
@Param(Constants.WRAPPER) Wrapper<SysUserMessage> queryWrapper);
/**
* 统计用户未读消息数量
*
* @param userId 用户ID
* @return 未读消息数量
*/
Long countUnreadByUserId(@Param("userId") String userId);
/**
* 批量更新消息为已读状态
*
* @param userId 用户ID
* @param messageIds 消息ID列表
* @return 更新的记录数
*/
int batchUpdateReadStatus(
@Param("userId") String userId,
@Param("messageIds") Iterable<Long> messageIds);
}
对应的 XML 映射文件(SysUserMessageMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.message.mapper.SysUserMessageMapper">
<select id="selectUserMessagePage" resultType="java.util.Map">
SELECT
sum.user_id,
sum.message_id,
sum.is_read,
sum.read_time,
sum.receive_time,
msg.title,
msg.content,
msg.message_type,
msg.sender_id,
msg.sender_name,
msg.created_time as message_created_time
FROM
sys_user_message sum
LEFT JOIN
sys_message msg ON sum.message_id = msg.id
${ew.customSqlSegment}
ORDER BY
sum.receive_time DESC
</select>
<select id="countUnreadByUserId" resultType="java.lang.Long">
SELECT
COUNT(1)
FROM
sys_user_message
WHERE
user_id = #{userId}
AND is_read = 0
AND status = 1
</select>
<update id="batchUpdateReadStatus">
UPDATE
sys_user_message
SET
is_read = 1,
read_time = NOW(),
updated_time = NOW()
WHERE
user_id = #{userId}
AND message_id IN
<foreach collection="messageIds" item="id" open="(" separator="," close=")">
#{id}
</foreach>
AND is_read = 0
AND status = 1
</update>
</mapper>
其他 Mapper 接口的实现类似,这里不再一一列出。
定义一些常用的枚举类,提高代码的可读性和可维护性。
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 消息类型枚举
*
* @author ken
*/
@Getter
@AllArgsConstructor
public enum MessageTypeEnum {
/**
* 系统通知
*/
SYSTEM_NOTICE(1, "系统通知"),
/**
* 业务提醒
*/
BUSINESS_REMINDER(2, "业务提醒"),
/**
* 互动消息
*/
INTERACTIVE_MESSAGE(3, "互动消息"),
/**
* 营销消息
*/
MARKETING_MESSAGE(4, "营销消息");
/**
* 类型编码
*/
private final Integer code;
/**
* 类型名称
*/
private final String name;
/**
* 根据编码获取枚举
*
* @param code 编码
* @return 枚举,不存在则返回null
*/
public static MessageTypeEnum getByCode(Integer code) {
if (ObjectUtils.isEmpty(code)) {
return null;
}
for (MessageTypeEnum type : values()) {
if (type.code.equals(code)) {
return type;
}
}
return null;
}
}
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 消息渠道枚举
*
* @author ken
*/
@Getter
@AllArgsConstructor
public enum MessageChannelEnum {
/**
* 站内消息
*/
IN_APP("IN_APP", "站内消息"),
/**
* 短信
*/
SMS("SMS", "短信"),
/**
* 邮件
*/
EMAIL("EMAIL", "邮件"),
/**
* AI电话
*/
AI_CALL("AI_CALL", "AI电话");
/**
* 渠道编码
*/
private final String code;
/**
* 渠道名称
*/
private final String name;
/**
* 根据编码获取枚举
*
* @param code 编码
* @return 枚举,不存在则返回null
*/
public static MessageChannelEnum getByCode(String code) {
if (StringUtils.isEmpty(code)) {
return null;
}
for (MessageChannelEnum channel : values()) {
if (channel.code.equals(code)) {
return channel;
}
}
return null;
}
}
为了支持多种消息渠道,我们定义一个统一的消息发送接口:
import com.example.message.vo.MessageSendVO;
/**
* 消息渠道接口
*
* @author ken
*/
public interface MessageChannel {
/**
* 获取渠道编码
*
* @return 渠道编码
*/
String getChannelCode();
/**
* 发送消息
*
* @param message 消息内容
* @return 发送结果,true-成功,false-失败
*/
boolean send(MessageSendVO message);
}
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysMessage;
import com.example.message.entity.SysUserMessage;
import com.example.message.mapper.SysMessageMapper;
import com.example.message.mapper.SysUserMessageMapper;
import com.example.message.service.NotifyService;
import com.example.message.vo.MessageSendVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* 站内消息渠道实现
*
* @author ken
*/
@Slf4j
@Component
public class InAppMessageChannel implements MessageChannel {
private final SysMessageMapper messageMapper;
private final SysUserMessageMapper userMessageMapper;
private final NotifyService notifyService;
public InAppMessageChannel(SysMessageMapper messageMapper,
SysUserMessageMapper userMessageMapper,
NotifyService notifyService) {
this.messageMapper = messageMapper;
this.userMessageMapper = userMessageMapper;
this.notifyService = notifyService;
}
@Override
public String getChannelCode() {
return "IN_APP";
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean send(MessageSendVO message) {
log.info("通过站内消息渠道发送消息,接收用户:{},消息标题:{}",
message.getReceiverIds(), message.getTitle());
try {
// 1. 保存消息主表
SysMessage sysMessage = new SysMessage();
sysMessage.setTitle(message.getTitle());
sysMessage.setContent(message.getContent());
sysMessage.setMessageType(message.getMessageType());
sysMessage.setTemplateId(message.getTemplateId());
sysMessage.setSenderId(message.getSenderId());
sysMessage.setSenderName(message.getSenderName());
sysMessage.setIsGlobal(CollectionUtils.isEmpty(message.getReceiverIds()) ? 1 : 0);
sysMessage.setStatus(1);
messageMapper.insert(sysMessage);
// 如果是全局消息,这里可以有专门的处理逻辑,比如异步批量插入所有用户
// 这里简化处理,只处理指定用户
if (!CollectionUtils.isEmpty(message.getReceiverIds())) {
// 2. 批量保存用户消息关联
List<SysUserMessage> userMessages = message.getReceiverIds().stream()
.map(receiverId -> {
SysUserMessage userMessage = new SysUserMessage();
userMessage.setMessageId(sysMessage.getId());
userMessage.setUserId(receiverId);
userMessage.setIsRead(0);
userMessage.setReceiveTime(LocalDateTime.now());
userMessage.setStatus(1);
return userMessage;
})
.collect(Collectors.toList());
// 批量插入
if (!CollectionUtils.isEmpty(userMessages)) {
userMessageMapper.batchInsert(userMessages);
}
// 3. 推送实时通知
notifyService.notifyNewMessage(sysMessage.getId(), message.getReceiverIds());
}
log.info("站内消息发送成功,消息ID:{}", sysMessage.getId());
return true;
} catch (Exception e) {
log.error("站内消息发送失败", e);
return false;
}
}
}
渠道服务用于管理和选择合适的消息渠道:
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysUserChannelPreference;
import com.example.message.enums.MessageChannelEnum;
import com.example.message.mapper.SysUserChannelPreferenceMapper;
import com.example.message.vo.MessageSendVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 消息渠道服务
*
* @author ken
*/
@Slf4j
@Service
public class MessageChannelService {
private final Map<String, MessageChannel> channelMap;
private final SysUserChannelPreferenceMapper userChannelPreferenceMapper;
/**
* 构造函数,自动注入所有MessageChannel实现
*
* @param channels 消息渠道列表
* @param userChannelPreferenceMapper 用户渠道偏好Mapper
*/
public MessageChannelService(List<MessageChannel> channels,
SysUserChannelPreferenceMapper userChannelPreferenceMapper) {
this.channelMap = channels.stream()
.collect(Collectors.toMap(MessageChannel::getChannelCode, channel -> channel));
this.userChannelPreferenceMapper = userChannelPreferenceMapper;
}
/**
* 获取所有可用的消息渠道
*
* @return 消息渠道列表
*/
public List<MessageChannel> getAllChannels() {
return Lists.newArrayList(channelMap.values());
}
/**
* 根据渠道编码获取消息渠道
*
* @param channelCode 渠道编码
* @return 消息渠道,不存在则返回null
*/
public MessageChannel getChannelByCode(String channelCode) {
if (StringUtils.isEmpty(channelCode)) {
return null;
}
return channelMap.get(channelCode);
}
/**
* 为消息选择合适的渠道并发送
*
* @param message 消息内容
* @return 发送结果
*/
public boolean sendMessage(MessageSendVO message) {
log.info("开始发送消息,标题:{},接收用户:{}", message.getTitle(), message.getReceiverIds());
// 1. 获取消息模板指定的渠道
List<String> candidateChannels = Lists.newArrayList();
if (StringUtils.hasText(message.getChannels())) {
candidateChannels = Arrays.asList(message.getChannels().split(","));
} else {
// 如果没有指定渠道,默认使用站内消息
candidateChannels.add(MessageChannelEnum.IN_APP.getCode());
}
// 2. 过滤掉不可用的渠道
List<String> availableChannels = candidateChannels.stream()
.filter(channelCode -> channelMap.containsKey(channelCode))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(availableChannels)) {
log.error("没有可用的消息渠道,消息发送失败");
return false;
}
// 3. 对每个接收用户,根据其偏好选择渠道发送
if (!CollectionUtils.isEmpty(message.getReceiverIds())) {
// 批量获取用户的渠道偏好
Map<String, String> userChannelMap = getUserChannelPreferences(
message.getReceiverIds(), message.getMessageType());
// 对每个用户单独处理
for (String userId : message.getReceiverIds()) {
// 获取用户偏好的渠道
String userChannels = userChannelMap.getOrDefault(userId, String.join(",", availableChannels));
List<String> userChannelList = Arrays.asList(userChannels.split(","));
// 选择用户偏好的第一个可用渠道
String selectedChannel = userChannelList.stream()
.filter(availableChannels::contains)
.findFirst()
.orElse(availableChannels.get(0));
// 发送消息
MessageChannel channel = getChannelByCode(selectedChannel);
if (ObjectUtils.isEmpty(channel)) {
log.warn("消息渠道不存在,渠道编码:{}", selectedChannel);
continue;
}
// 创建用户专属的消息对象
MessageSendVO userMessage = new MessageSendVO();
BeanUtils.copyProperties(message, userMessage);
userMessage.setReceiverIds(Lists.newArrayList(userId));
// 发送消息
boolean success = channel.send(userMessage);
if (!success) {
log.error("用户消息发送失败,用户ID:{},渠道:{}", userId, selectedChannel);
}
}
} else if (message.getIsGlobal()) {
// 全局消息,使用默认渠道发送
for (String channelCode : availableChannels) {
MessageChannel channel = getChannelByCode(channelCode);
if (ObjectUtils.isEmpty(channel)) {
log.warn("消息渠道不存在,渠道编码:{}", channelCode);
continue;
}
boolean success = channel.send(message);
if (!success) {
log.error("全局消息发送失败,渠道:{}", channelCode);
}
}
}
return true;
}
/**
* 批量获取用户的渠道偏好
*
* @param userIds 用户ID列表
* @param messageType 消息类型
* @return 用户ID到渠道列表的映射
*/
private Map<String, String> getUserChannelPreferences(List<String> userIds, Integer messageType) {
if (CollectionUtils.isEmpty(userIds) || ObjectUtils.isEmpty(messageType)) {
return Maps.newHashMap();
}
try {
// 查询用户的渠道偏好
LambdaQueryWrapper<SysUserChannelPreference> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(SysUserChannelPreference::getUserId, userIds)
.eq(SysUserChannelPreference::getMessageType, messageType)
.eq(SysUserChannelPreference::getIsEnabled, 1);
List<SysUserChannelPreference> preferences = userChannelPreferenceMapper.selectList(queryWrapper);
if (CollectionUtils.isEmpty(preferences)) {
return Maps.newHashMap();
}
// 转换为Map
return preferences.stream()
.collect(Collectors.toMap(
SysUserChannelPreference::getUserId,
SysUserChannelPreference::getChannels,
(existing, replacement) -> existing // 处理重复用户ID的情况
));
} catch (Exception e) {
log.error("获取用户渠道偏好失败", e);
return Maps.newHashMap();
}
}
}
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.message.entity.SysMessageTemplate;
import com.example.message.entity.SysUserMessage;
import com.example.message.mapper.SysMessageTemplateMapper;
import com.example.message.mapper.SysUserMessageMapper;
import com.example.message.service.MessageChannelService;
import com.example.message.service.MessageService;
import com.example.message.vo.MessagePageVO;
import com.example.message.vo.MessageSendVO;
import com.example.message.vo.MessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 消息服务实现
*
* @author ken
*/
@Slf4j
@Service
public class MessageServiceImpl implements MessageService {
private final MessageChannelService channelService;
private final SysMessageTemplateMapper templateMapper;
private final SysUserMessageMapper userMessageMapper;
private final RedisTemplate<String, Object> redisTemplate;
/**
* 未读消息数量缓存键前缀
*/
private static final String UNREAD_COUNT_PREFIX = "message:unread:count:";
public MessageServiceImpl(MessageChannelService channelService,
SysMessageTemplateMapper templateMapper,
SysUserMessageMapper userMessageMapper,
RedisTemplate<String, Object> redisTemplate) {
this.channelService = channelService;
this.templateMapper = templateMapper;
this.userMessageMapper = userMessageMapper;
this.redisTemplate = redisTemplate;
}
@Override
public boolean sendMessage(MessageSendVO message) {
log.info("发送消息,标题:{},接收用户数量:{}",
message.getTitle(), CollectionUtils.size(message.getReceiverIds()));
// 参数校验
StringUtils.hasText(message.getTitle(), "消息标题不能为空");
StringUtils.hasText(message.getContent(), "消息内容不能为空");
ObjectUtils.isEmpty(message.getMessageType(), "消息类型不能为空");
// 如果指定了模板ID,则使用模板内容覆盖
if (!ObjectUtils.isEmpty(message.getTemplateId())) {
SysMessageTemplate template = templateMapper.selectById(message.getTemplateId());
if (!ObjectUtils.isEmpty(template)) {
message.setTitle(template.getTitleTemplate());
message.setContent(template.getContentTemplate());
message.setChannels(template.getChannels());
if (ObjectUtils.isEmpty(message.getMessageType())) {
message.setMessageType(template.getMessageType());
}
}
}
// 替换消息内容中的变量
if (!CollectionUtils.isEmpty(message.getVariables())) {
String processedTitle = replaceVariables(message.getTitle(), message.getVariables());
String processedContent = replaceVariables(message.getContent(), message.getVariables());
message.setTitle(processedTitle);
message.setContent(processedContent);
}
// 通过渠道服务发送消息
return channelService.sendMessage(message);
}
@Override
public boolean sendMessageByTemplate(String templateCode, MessageSendVO message) {
log.info("通过模板发送消息,模板编码:{},接收用户数量:{}",
templateCode, CollectionUtils.size(message.getReceiverIds()));
// 参数校验
StringUtils.hasText(templateCode, "模板编码不能为空");
// 查询模板
LambdaQueryWrapper<SysMessageTemplate> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SysMessageTemplate::getTemplateCode, templateCode)
.eq(SysMessageTemplate::getIsEnabled, 1);
SysMessageTemplate template = templateMapper.selectOne(queryWrapper);
if (ObjectUtils.isEmpty(template)) {
log.error("消息模板不存在或已禁用,模板编码:{}", templateCode);
throw new IllegalArgumentException("消息模板不存在或已禁用");
}
// 设置模板信息
message.setTemplateId(template.getId());
message.setMessageType(template.getMessageType());
message.setChannels(template.getChannels());
// 发送消息
return sendMessage(message);
}
@Override
public IPage<MessageVO> getUserMessagePage(MessagePageVO pageVO) {
log.info("查询用户消息列表,用户ID:{},页码:{},每页大小:{}",
pageVO.getUserId(), pageVO.getPageNum(), pageVO.getPageSize());
// 参数校验
StringUtils.hasText(pageVO.getUserId(), "用户ID不能为空");
if (pageVO.getPageNum() <= 0) {
pageVO.setPageNum(1);
}
if (pageVO.getPageSize() <= 0 || pageVO.getPageSize() > 100) {
pageVO.setPageSize(20);
}
// 构建分页参数
Page<Map<String, Object>> page = new Page<>(pageVO.getPageNum(), pageVO.getPageSize());
// 构建查询条件
LambdaQueryWrapper<SysUserMessage> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SysUserMessage::getUserId, pageVO.getUserId())
.eq(SysUserMessage::getStatus, 1);
// 消息类型过滤
if (!ObjectUtils.isEmpty(pageVO.getMessageType())) {
queryWrapper.eq(SysUserMessage::getMessageType, pageVO.getMessageType());
}
// 已读/未读过滤
if (!ObjectUtils.isEmpty(pageVO.getIsRead())) {
queryWrapper.eq(SysUserMessage::getIsRead, pageVO.getIsRead());
}
// 时间范围过滤
if (!ObjectUtils.isEmpty(pageVO.getStartTime())) {
queryWrapper.ge(SysUserMessage::getReceiveTime, pageVO.getStartTime());
}
if (!ObjectUtils.isEmpty(pageVO.getEndTime())) {
queryWrapper.le(SysUserMessage::getReceiveTime, pageVO.getEndTime());
}
// 执行查询
IPage<Map<String, Object>> resultPage = userMessageMapper.selectUserMessagePage(page, queryWrapper);
// 转换结果
return resultPage.convert(this::convertToMessageVO);
}
@Override
public Long getUnreadCount(String userId) {
log.info("查询用户未读消息数量,用户ID:{}", userId);
// 参数校验
StringUtils.hasText(userId, "用户ID不能为空");
try {
// 先从缓存获取
String cacheKey = UNREAD_COUNT_PREFIX + userId;
Object cacheValue = redisTemplate.opsForValue().get(cacheKey);
if (!ObjectUtils.isEmpty(cacheValue)) {
return Long.parseLong(cacheValue.toString());
}
// 缓存未命中,从数据库查询
Long unreadCount = userMessageMapper.countUnreadByUserId(userId);
if (ObjectUtils.isEmpty(unreadCount)) {
unreadCount = 0L;
}
// 存入缓存,设置1小时过期
redisTemplate.opsForValue().set(cacheKey, unreadCount, 1, TimeUnit.HOURS);
return unreadCount;
} catch (Exception e) {
log.error("查询用户未读消息数量失败,用户ID:{}", userId, e);
// 异常情况下直接查询数据库
Long unreadCount = userMessageMapper.countUnreadByUserId(userId);
return ObjectUtils.isEmpty(unreadCount) ? 0L : unreadCount;
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean markAsRead(String userId, List<Long> messageIds) {
log.info("标记消息为已读,用户ID:{},消息数量:{}", userId, CollectionUtils.size(messageIds));
// 参数校验
StringUtils.hasText(userId, "用户ID不能为空");
if (CollectionUtils.isEmpty(messageIds)) {
log.warn("消息ID列表为空,无需标记已读");
return true;
}
try {
// 批量更新已读状态
int updateCount = userMessageMapper.batchUpdateReadStatus(userId, messageIds);
log.info("标记消息为已读完成,用户ID:{},更新数量:{}", userId, updateCount);
// 清除未读计数缓存
clearUnreadCountCache(userId);
return updateCount > 0;
} catch (Exception e) {
log.error("标记消息为已读失败,用户ID:{}", userId, e);
throw new RuntimeException("标记消息为已读失败", e);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean markAllAsRead(String userId) {
log.info("标记所有消息为已读,用户ID:{}", userId);
// 参数校验
StringUtils.hasText(userId, "用户ID不能为空");
try {
// 查询所有未读消息
LambdaQueryWrapper<SysUserMessage> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SysUserMessage::getUserId, userId)
.eq(SysUserMessage::getIsRead, 0)
.eq(SysUserMessage::getStatus, 1);
List<SysUserMessage> unreadMessages = userMessageMapper.selectList(queryWrapper);
if (CollectionUtils.isEmpty(unreadMessages)) {
log.info("用户没有未读消息,无需标记已读,用户ID:{}", userId);
return true;
}
// 提取消息ID
List<Long> messageIds = unreadMessages.stream()
.map(SysUserMessage::getMessageId)
.collect(Collectors.toList());
// 批量更新已读状态
int updateCount = userMessageMapper.batchUpdateReadStatus(userId, messageIds);
log.info("标记所有消息为已读完成,用户ID:{},更新数量:{}", userId, updateCount);
// 清除未读计数缓存
clearUnreadCountCache(userId);
return updateCount > 0;
} catch (Exception e) {
log.error("标记所有消息为已读失败,用户ID:{}", userId, e);
throw new RuntimeException("标记所有消息为已读失败", e);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean deleteMessage(String userId, List<Long> messageIds) {
log.info("删除消息,用户ID:{},消息数量:{}", userId, CollectionUtils.size(messageIds));
// 参数校验
StringUtils.hasText(userId, "用户ID不能为空");
if (CollectionUtils.isEmpty(messageIds)) {
log.warn("消息ID列表为空,无需删除");
return true;
}
try {
// 逻辑删除消息(更新状态为0)
LambdaUpdateWrapper<SysUserMessage> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(SysUserMessage::getUserId, userId)
.in(SysUserMessage::getMessageId, messageIds)
.set(SysUserMessage::getStatus, 0);
int deleteCount = userMessageMapper.update(null, updateWrapper);
log.info("删除消息完成,用户ID:{},删除数量:{}", userId, deleteCount);
// 清除未读计数缓存
clearUnreadCountCache(userId);
return deleteCount > 0;
} catch (Exception e) {
log.error("删除消息失败,用户ID:{}", userId, e);
throw new RuntimeException("删除消息失败", e);
}
}
/**
* 清除用户未读消息数量缓存
*
* @param userId 用户ID
*/
public void clearUnreadCountCache(String userId) {
if (StringUtils.hasText(userId)) {
String cacheKey = UNREAD_COUNT_PREFIX + userId;
redisTemplate.delete(cacheKey);
log.info("清除用户未读消息数量缓存,用户ID:{},缓存键:{}", userId, cacheKey);
}
}
/**
* 替换消息内容中的变量
*
* @param content 原始内容
* @param variables 变量映射
* @return 替换后的内容
*/
private String replaceVariables(String content, Map<String, Object> variables) {
if (StringUtils.isEmpty(content) || CollectionUtils.isEmpty(variables)) {
return content;
}
String processedContent = content;
for (Map.Entry<String, Object> entry : variables.entrySet()) {
String placeholder = "${" + entry.getKey() + "}";
String value = entry.getValue() != null ? entry.getValue().toString() : "";
processedContent = processedContent.replace(placeholder, value);
}
return processedContent;
}
/**
* 将Map转换为MessageVO
*
* @param map 数据库查询结果Map
* @return MessageVO对象
*/
private MessageVO convertToMessageVO(Map<String, Object> map) {
if (CollectionUtils.isEmpty(map)) {
return null;
}
MessageVO messageVO = new MessageVO();
messageVO.setMessageId(ObjectUtils.isEmpty(map.get("message_id")) ? null :
Long.parseLong(map.get("message_id").toString()));
messageVO.setUserId(map.get("user_id") != null ? map.get("user_id").toString() : null);
messageVO.setTitle(map.get("title") != null ? map.get("title").toString() : null);
messageVO.setContent(map.get("content") != null ? map.get("content").toString() : null);
messageVO.setMessageType(ObjectUtils.isEmpty(map.get("message_type")) ? null :
Integer.parseInt(map.get("message_type").toString()));
messageVO.setSenderId(map.get("sender_id") != null ? map.get("sender_id").toString() : null);
messageVO.setSenderName(map.get("sender_name") != null ? map.get("sender_name").toString() : null);
messageVO.setIsRead(ObjectUtils.isEmpty(map.get("is_read")) ? null :
Integer.parseInt(map.get("is_read").toString()));
messageVO.setReadTime(map.get("read_time") != null ? (LocalDateTime) map.get("read_time") : null);
messageVO.setReceiveTime(map.get("receive_time") != null ? (LocalDateTime) map.get("receive_time") : null);
messageVO.setCreatedTime(map.get("message_created_time") != null ? (LocalDateTime) map.get("message_created_time") : null);
return messageVO;
}
}
为了实现前端实时显示新消息的功能,我们使用 WebSocket 技术:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* WebSocket配置
*
* @author ken
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final MessageWebSocketHandler messageWebSocketHandler;
private final HandshakeInterceptor handshakeInterceptor;
public WebSocketConfig(MessageWebSocketHandler messageWebSocketHandler,
HandshakeInterceptor handshakeInterceptor) {
this.messageWebSocketHandler = messageWebSocketHandler;
this.handshakeInterceptor = handshakeInterceptor;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册WebSocket处理器,允许所有来源的连接
registry.addHandler(messageWebSocketHandler, "/ws/message")
.addInterceptors(handshakeInterceptor)
.setAllowedOrigins("*");
}
/**
* 握手拦截器,用于验证用户身份
*
* @return 握手拦截器
*/
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new MessageHandshakeInterceptor();
}
}
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpSession;
import java.util.Map;
/**
* 消息WebSocket握手拦截器
*
* @author ken
*/
public class MessageHandshakeInterceptor implements HandshakeInterceptor {
/**
* 握手前的处理,用于验证用户身份
*
* @param request 请求对象
* @param response 响应对象
* @param wsHandler WebSocket处理器
* @param attributes attributes
* @return 是否允许握手
* @throws Exception 异常
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession(false);
// 从请求参数中获取用户ID,实际应用中应该从登录信息中获取
String userId = servletRequest.getServletRequest().getParameter("userId");
if (userId != null) {
// 将用户ID存储在属性中,供后续使用
attributes.put("userId", userId);
return true;
}
}
return false;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
// 握手后的处理,无需实现
}
}
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 消息WebSocket处理器
*
* @author ken
*/
@Slf4j
@Component
public class MessageWebSocketHandler extends TextWebSocketHandler {
/**
* 存储用户ID与WebSocketSession的映射
*/
private final Map<String, WebSocketSession> userSessionMap = new ConcurrentHashMap<>();
/**
* 连接建立后调用
*
* @param session WebSocket会话
* @throws Exception 异常
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 从会话属性中获取用户ID
String userId = (String) session.getAttributes().get("userId");
if (userId != null) {
userSessionMap.put(userId, session);
log.info("WebSocket连接建立,用户ID:{},当前在线用户数:{}", userId, userSessionMap.size());
}
}
/**
* 处理接收到的消息
*
* @param session WebSocket会话
* @param message 接收到的消息
* @throws Exception 异常
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String userId = (String) session.getAttributes().get("userId");
log.info("收到用户消息,用户ID:{},消息内容:{}", userId, message.getPayload());
// 这里可以处理客户端发送的消息,例如确认收到消息等
// 简单回复一个确认消息
session.sendMessage(new TextMessage("服务器已收到消息:" + message.getPayload()));
}
/**
* 连接关闭后调用
*
* @param session WebSocket会话
* @param status 关闭状态
* @throws Exception 异常
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
String userId = (String) session.getAttributes().get("userId");
if (userId != null) {
userSessionMap.remove(userId);
log.info("WebSocket连接关闭,用户ID:{},当前在线用户数:{}", userId, userSessionMap.size());
}
}
/**
* 向指定用户发送消息
*
* @param userId 用户ID
* @param message 消息内容
* @return 是否发送成功
*/
public boolean sendMessageToUser(String userId, Object message) {
if (userId == null || message == null) {
return false;
}
WebSocketSession session = userSessionMap.get(userId);
if (session == null || !session.isOpen()) {
log.warn("用户WebSocket连接不存在或已关闭,用户ID:{}", userId);
return false;
}
try {
String jsonMessage = JSON.toJSONString(message);
session.sendMessage(new TextMessage(jsonMessage));
log.info("向用户发送WebSocket消息成功,用户ID:{},消息内容:{}", userId, jsonMessage);
return true;
} catch (IOException e) {
log.error("向用户发送WebSocket消息失败,用户ID:{}", userId, e);
return false;
}
}
/**
* 向多个用户发送消息
*
* @param userIds 用户ID列表
* @param message 消息内容
*/
public void sendMessageToUsers(Set<String> userIds, Object message) {
if (userIds == null || userIds.isEmpty() || message == null) {
return;
}
String jsonMessage = JSON.toJSONString(message);
TextMessage textMessage = new TextMessage(jsonMessage);
for (String userId : userIds) {
WebSocketSession session = userSessionMap.get(userId);
if (session != null && session.isOpen()) {
try {
session.sendMessage(textMessage);
log.info("向用户发送WebSocket消息成功,用户ID:{}", userId);
} catch (IOException e) {
log.error("向用户发送WebSocket消息失败,用户ID:{}", userId, e);
}
} else {
log.warn("用户WebSocket连接不存在或已关闭,用户ID:{}", userId);
}
}
}
/**
* 获取在线用户ID列表
*
* @return 在线用户ID列表
*/
public Set<String> getOnlineUserIds() {
return userSessionMap.keySet();
}
}
import com.example.message.entity.SysMessage;
import com.example.message.mapper.SysMessageMapper;
import com.example.message.vo.NotificationVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 通知服务,用于向客户端推送实时消息
*
* @author ken
*/
@Slf4j
@Service
public class NotifyService {
private final MessageWebSocketHandler webSocketHandler;
private final SysMessageMapper messageMapper;
private final MessageService messageService;
public NotifyService(MessageWebSocketHandler webSocketHandler,
SysMessageMapper messageMapper,
MessageService messageService) {
this.webSocketHandler = webSocketHandler;
this.messageMapper = messageMapper;
this.messageService = messageService;
}
/**
* 通知用户有新消息
*
* @param messageId 消息ID
* @param userIds 用户ID列表
*/
public void notifyNewMessage(Long messageId, List<String> userIds) {
log.info("通知用户有新消息,消息ID:{},用户数量:{}", messageId, userIds.size());
if (ObjectUtils.isEmpty(messageId) || userIds == null || userIds.isEmpty()) {
log.warn("消息ID或用户列表为空,无法发送通知");
return;
}
try {
// 查询消息详情
SysMessage message = messageMapper.selectById(messageId);
if (ObjectUtils.isEmpty(message)) {
log.error("消息不存在,无法发送通知,消息ID:{}", messageId);
return;
}
// 构建通知内容
NotificationVO notification = new NotificationVO();
notification.setType("NEW_MESSAGE");
notification.setMessageId(messageId);
notification.setTitle(message.getTitle());
notification.setContent(message.getContent());
notification.setSendTime(message.getCreatedTime());
// 转换为Set,避免重复
Set<String> userIdSet = userIds.stream().collect(Collectors.toSet());
// 向用户发送WebSocket通知
webSocketHandler.sendMessageToUsers(userIdSet, notification);
// 清除用户未读计数缓存
for (String userId : userIdSet) {
messageService.clearUnreadCountCache(userId);
}
} catch (Exception e) {
log.error("通知用户新消息失败,消息ID:{}", messageId, e);
}
}
}
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.example.message.service.MessageService;
import com.example.message.vo.MessagePageVO;
import com.example.message.vo.MessageSendVO;
import com.example.message.vo.MessageVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
/**
* 消息控制器
*
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/api/message")
@Tag(name = "消息接口", description = "提供消息发送、查询、已读等功能")
public class MessageController {
private final MessageService messageService;
public MessageController(MessageService messageService) {
this.messageService = messageService;
}
@PostMapping("/send")
@Operation(summary = "发送消息", description = "直接发送一条消息给指定用户")
public Map<String, Object> sendMessage(@RequestBody MessageSendVO message) {
boolean success = messageService.sendMessage(message);
Map<String, Object> result = Maps.newHashMap();
result.put("success", success);
result.put("message", success ? "消息发送成功" : "消息发送失败");
return result;
}
@PostMapping("/sendByTemplate")
@Operation(summary = "通过模板发送消息", description = "使用指定的消息模板发送消息")
public Map<String, Object> sendMessageByTemplate(
@Parameter(description = "模板编码", required = true) @RequestParam String templateCode,
@RequestBody MessageSendVO message) {
boolean success = messageService.sendMessageByTemplate(templateCode, message);
Map<String, Object> result = Maps.newHashMap();
result.put("success", success);
result.put("message", success ? "消息发送成功" : "消息发送失败");
return result;
}
@GetMapping("/list")
@Operation(summary = "查询用户消息列表", description = "分页查询用户的消息列表,支持筛选条件")
public Map<String, Object> getUserMessageList(MessagePageVO pageVO) {
IPage<MessageVO> messagePage = messageService.getUserMessagePage(pageVO);
Map<String, Object> result = Maps.newHashMap();
result.put("success", true);
result.put("data", messagePage.getRecords());
result.put("total", messagePage.getTotal());
result.put("pages", messagePage.getPages());
result.put("current", messagePage.getCurrent());
result.put("size", messagePage.getSize());
return result;
}
@GetMapping("/unread/count")
@Operation(summary = "查询未读消息数量", description = "查询指定用户的未读消息总数")
public Map<String, Object> getUnreadCount(
@Parameter(description = "用户ID", required = true) @RequestParam String userId) {
Long unreadCount = messageService.getUnreadCount(userId);
Map<String, Object> result = Maps.newHashMap();
result.put("success", true);
result.put("data", unreadCount);
return result;
}
@PostMapping("/read")
@Operation(summary = "标记消息为已读", description = "将指定的消息标记为已读状态")
public Map<String, Object> markAsRead(
@Parameter(description = "用户ID", required = true) @RequestParam String userId,
@Parameter(description = "消息ID列表", required = true) @RequestBody List<Long> messageIds) {
boolean success = messageService.markAsRead(userId, messageIds);
Map<String, Object> result = Maps.newHashMap();
result.put("success", success);
result.put("message", success ? "标记已读成功" : "标记已读失败");
return result;
}
@PostMapping("/read/all")
@Operation(summary = "标记所有消息为已读", description = "将用户的所有未读消息标记为已读状态")
public Map<String, Object> markAllAsRead(
@Parameter(description = "用户ID", required = true) @RequestParam String userId) {
boolean success = messageService.markAllAsRead(userId);
Map<String, Object> result = Maps.newHashMap();
result.put("success", success);
result.put("message", success ? "标记所有已读成功" : "标记所有已读失败");
return result;
}
@PostMapping("/delete")
@Operation(summary = "删除消息", description = "删除用户的指定消息")
public Map<String, Object> deleteMessage(
@Parameter(description = "用户ID", required = true) @RequestParam String userId,
@Parameter(description = "消息ID列表", required = true) @RequestBody List<Long> messageIds) {
boolean success = messageService.deleteMessage(userId, messageIds);
Map<String, Object> result = Maps.newHashMap();
result.put("success", success);
result.put("message", success ? "删除消息成功" : "删除消息失败");
return result;
}
}
为了完整展示实时通知功能,我们提供一个简单的前端 WebSocket 连接示例:
// 连接WebSocket
function connectWebSocket(userId) {
// 判断浏览器是否支持WebSocket
if (!window.WebSocket) {
console.error('您的浏览器不支持WebSocket,无法接收实时消息');
return null;
}
// 构建WebSocket连接URL
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = window.location.host;
const url = `${protocol}//${host}/ws/message?userId=${userId}`;
// 创建WebSocket连接
const ws = new WebSocket(url);
// 连接成功回调
ws.onopen = function() {
console.log('WebSocket连接成功');
};
// 接收消息回调
ws.onmessage = function(event) {
console.log('收到新消息:', event.data);
const message = JSON.parse(event.data);
// 处理新消息通知
if (message.type === 'NEW_MESSAGE') {
handleNewMessage(message);
}
};
// 连接关闭回调
ws.onclose = function() {
console.log('WebSocket连接关闭,尝试重新连接...');
// 5秒后尝试重新连接
setTimeout(() => connectWebSocket(userId), 5000);
};
// 连接错误回调
ws.onerror = function(error) {
console.error('WebSocket错误:', error);
};
return ws;
}
// 处理新消息
function handleNewMessage(message) {
// 显示消息通知
showNotification(`新消息: ${message.title}`, message.content);
// 更新未读消息数量
updateUnreadCount();
// 如果当前在消息列表页面,可以刷新列表
if (isOnMessageListPage()) {
refreshMessageList();
}
}
// 显示消息通知
function showNotification(title, content) {
// 简单实现,实际项目中可以使用更美观的通知组件
const notification = document.createElement('div');
notification.className = 'message-notification';
notification.innerHTML = `
<div class="notification-title">${title}</div>
<div class="notification-content">${content}</div>
`;
document.body.appendChild(notification);
// 3秒后自动关闭通知
setTimeout(() => {
notification.style.opacity = '0';
setTimeout(() => notification.remove(), 500);
}, 3000);
}
// 更新未读消息数量
function updateUnreadCount() {
// 调用后端接口获取最新的未读消息数量
fetch('/api/message/unread/count?userId=' + currentUserId)
.then(response => response.json())
.then(data => {
if (data.success) {
const countElement = document.getElementById('unread-count');
if (countElement) {
countElement.textContent = data.data;
// 如果有未读消息,显示小红点
if (data.data > 0) {
countElement.classList.add('has-unread');
} else {
countElement.classList.remove('has-unread');
}
}
}
});
}
为了支持短信发送功能,我们实现一个短信渠道:
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysMessageChannel;
import com.example.message.mapper.SysMessageChannelMapper;
import com.example.message.vo.MessageSendVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
* 短信消息渠道实现
*
* @author ken
*/
@Slf4j
@Component
public class SmsMessageChannel implements MessageChannel {
private final SysMessageChannelMapper channelMapper;
private final SmsService smsService;
public SmsMessageChannel(SysMessageChannelMapper channelMapper, SmsService smsService) {
this.channelMapper = channelMapper;
this.smsService = smsService;
}
@Override
public String getChannelCode() {
return "SMS";
}
@Override
public boolean send(MessageSendVO message) {
log.info("通过短信渠道发送消息,接收用户:{},消息标题:{}",
message.getReceiverIds(), message.getTitle());
try {
// 参数校验
if (ObjectUtils.isEmpty(message.getReceiverPhones()) || message.getReceiverPhones().isEmpty()) {
log.error("短信接收手机号不能为空");
return false;
}
// 获取短信渠道配置
SysMessageChannel channelConfig = getChannelConfig();
if (ObjectUtils.isEmpty(channelConfig) || !"1".equals(channelConfig.getIsEnabled())) {
log.error("短信渠道未配置或已禁用");
return false;
}
// 解析配置
Map<String, String> configMap = parseConfig(channelConfig.getConfig());
// 发送短信
for (String phone : message.getReceiverPhones()) {
// 短信内容通常有长度限制,这里做简单处理
String smsContent = StringUtils.hasText(message.getTitle()) ?
message.getTitle() + ":" + message.getContent() : message.getContent();
// 截取过长的内容
if (smsContent.length() > 500) {
smsContent = smsContent.substring(0, 500) + "...";
}
// 调用短信服务发送
boolean success = smsService.sendSms(
configMap.get("templateId"),
phone,
message.getVariables());
if (success) {
log.info("短信发送成功,手机号:{}", phone);
} else {
log.error("短信发送失败,手机号:{}", phone);
}
}
return true;
} catch (Exception e) {
log.error("短信发送失败", e);
return false;
}
}
/**
* 获取渠道配置
*
* @return 渠道配置
*/
private SysMessageChannel getChannelConfig() {
return channelMapper.selectOne(
new LambdaQueryWrapper<SysMessageChannel>()
.eq(SysMessageChannel::getChannelCode, getChannelCode())
.eq(SysMessageChannel::getIsEnabled, 1)
);
}
/**
* 解析配置JSON
*
* @param configJson 配置JSON字符串
* @return 配置映射
*/
private Map<String, String> parseConfig(String configJson) {
if (StringUtils.isEmpty(configJson)) {
return Maps.newHashMap();
}
try {
return JSON.parseObject(configJson, Map.class);
} catch (Exception e) {
log.error("解析渠道配置失败,配置内容:{}", configJson, e);
return Maps.newHashMap();
}
}
}
import com.example.message.channel.MessageChannel;
import com.example.message.entity.SysMessageChannel;
import com.example.message.mapper.SysMessageChannelMapper;
import com.example.message.vo.MessageSendVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.Map;
/**
* 邮件消息渠道实现
*
* @author ken
*/
@Slf4j
@Component
public class EmailMessageChannel implements MessageChannel {
private final SysMessageChannelMapper channelMapper;
private final JavaMailSender mailSender;
public EmailMessageChannel(SysMessageChannelMapper channelMapper, JavaMailSender mailSender) {
this.channelMapper = channelMapper;
this.mailSender = mailSender;
}
@Override
public String getChannelCode() {
return "EMAIL";
}
@Override
public boolean send(MessageSendVO message) {
log.info("通过邮件渠道发送消息,接收用户:{},消息标题:{}",
message.getReceiverIds(), message.getTitle());
try {
// 参数校验
if (ObjectUtils.isEmpty(message.getReceiverEmails()) || message.getReceiverEmails().isEmpty()) {
log.error("邮件接收地址不能为空");
return false;
}
if (StringUtils.isEmpty(message.getTitle())) {
log.error("邮件标题不能为空");
return false;
}
if (StringUtils.isEmpty(message.getContent())) {
log.error("邮件内容不能为空");
return false;
}
// 获取邮件渠道配置
SysMessageChannel channelConfig = getChannelConfig();
if (ObjectUtils.isEmpty(channelConfig) || !"1".equals(channelConfig.getIsEnabled())) {
log.error("邮件渠道未配置或已禁用");
return false;
}
// 解析配置
Map<String, String> configMap = parseConfig(channelConfig.getConfig());
String fromEmail = configMap.get("fromEmail");
if (StringUtils.isEmpty(fromEmail)) {
log.error("发件人邮箱未配置");
return false;
}
// 创建邮件消息
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);
helper.setFrom(fromEmail);
helper.setTo(message.getReceiverEmails().toArray(new String[0]));
helper.setSubject(message.getTitle());
helper.setText(message.getContent(), true); // 第二个参数表示是否为HTML内容
// 发送邮件
mailSender.send(mimeMessage);
log.info("邮件发送成功,收件人数量:{}", message.getReceiverEmails().size());
return true;
} catch (MessagingException e) {
log.error("邮件发送失败", e);
return false;
} catch (Exception e) {
log.error("邮件发送过程中发生异常", e);
return false;
}
}
/**
* 获取渠道配置
*
* @return 渠道配置
*/
private SysMessageChannel getChannelConfig() {
return channelMapper.selectOne(
new LambdaQueryWrapper<SysMessageChannel>()
.eq(SysMessageChannel::getChannelCode, getChannelCode())
.eq(SysMessageChannel::getIsEnabled, 1)
);
}
/**
* 解析配置JSON
*
* @param configJson 配置JSON字符串
* @return 配置映射
*/
private Map<String, String> parseConfig(String configJson) {
if (StringUtils.isEmpty(configJson)) {
return Maps.newHashMap();
}
try {
return JSON.parseObject(configJson, Map.class);
} catch (Exception e) {
log.error("解析渠道配置失败,配置内容:{}", configJson, e);
return Maps.newHashMap();
}
}
}
import com.example.message.service.MessageService;
import com.example.message.vo.MessageSendVO;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* 消息服务单元测试
*
* @author ken
*/
@SpringBootTest
public class MessageServiceTest {
@Autowired
private MessageService messageService;
@Test
public void testSendMessage() {
MessageSendVO message = new MessageSendVO();
message.setTitle("测试消息");
message.setContent("这是一条测试消息");
message.setMessageType(1);
message.setSenderId("system");
message.setSenderName("系统");
message.setReceiverIds(Arrays.asList("user1", "user2", "user3"));
boolean success = messageService.sendMessage(message);
assertTrue(success, "消息发送失败");
}
@Test
public void testSendMessageByTemplate() {
MessageSendVO message = new MessageSendVO();
message.setSenderId("system");
message.setSenderName("系统");
message.setReceiverIds(Arrays.asList("user1", "user2"));
// 设置消息变量
Map<String, Object> variables = new HashMap<>();
variables.put("username", "测试用户");
variables.put("orderId", "ORDER123456");
variables.put("amount", "99.99");
message.setVariables(variables);
boolean success = messageService.sendMessageByTemplate("ORDER_NOTIFY", message);
assertTrue(success, "通过模板发送消息失败");
}
}
本文详细介绍了如何设计和实现一套功能完善、可扩展的站内消息系统。从需求分析到架构设计,从数据库表结构到核心代码实现,我们全面覆盖了站内消息系统的各个方面。
系统的核心特点包括: