本文介绍一种基于Redis实现的生产级消息队列设计方案,该方案在保证可靠性的同时,兼顾了性能和易用性。
核心设计目标:
+----------------+ +-------------------+ +-------------+
| 主队列 | | 处理中队列 | | 死信队列 |
| (queueName) |<--->| (processingName) |---->| (dlqName) |
+----------------+ +-------------------+ +-------------+
^ ^
| |
| 消息入队 | 消息处理
| |
+-----+----------------------+-----+
| 生产者 |
+---------------------------------+
|
v
+---------------------------------+
| 消费者 |
+---------------------------------+
新消息
-> 入队到主队列
-> 消费者取出移至处理中队列
-> 处理成功则删除
-> 处理失败根据重试策略决定重新入队或进入死信队列
// 队列状态定义
type QueueState struct {
MainQueue string // 主队列: "queue:{name}"
ProcessingQueue string // 处理中队列: "queue:{name}:processing"
DeadLetterQueue string // 死信队列: "queue:{name}:dlq"
ProcessingMarker string // 处理标记前缀: "queue:{name}:marker:"
}
设计要点:
// 消息实体
type QueueMessage struct {
ID string // 消息唯一ID
Body []byte // 消息体
Attempts int // 已尝试次数
Timestamp time.Time // 入队时间
}
// 队列配置
type QueueConfig struct {
Name string
MaxRetries int // 最大重试次数
VisibilityTimeout time.Duration // 消息可见超时
ProcessTimeout time.Duration // 处理超时
}
// Enqueue 将消息加入队列
func (q *RedisQueue) Enqueue(ctx context.Context, msg *QueueMessage) error {
// 序列化消息对象为字节数组,便于Redis存储
// serializeMessage 是自定义的序列化方法,可以使用JSON、Protobuf等格式
data, err := serializeMessage(msg)
if err != nil {
// 如果序列化失败,返回序列化错误
return fmt.Errorf("message serialization failed: %w", err)
}
// 使用Redis Pipeline批量执行以下操作,减少网络往返次数:
// 1. 将消息LPUSH到主队列头部(保证新消息优先处理)
// 2. INCR队列长度计数器(用于监控队列长度)
pipe := q.client.Pipeline()
// LPUSH命令将消息放入队列头部
pipe.LPush(ctx, q.state.MainQueue, data)
// INCR命令原子性增加队列长度计数
pipe.Incr(ctx, q.metrics.QueueLengthKey)
// 执行Pipeline中的所有命令
// Exec返回命令执行结果和可能的错误
_, err = pipe.Exec(ctx)
if err != nil {
// 如果执行失败,返回Redis操作错误
return fmt.Errorf("redis pipeline execution failed: %w", err)
}
// 成功入队后,增加入队计数器指标
// Enqueued是Prometheus Counter类型指标
q.metrics.Enqueued.Inc()
// 返回nil表示操作成功完成
return nil
}
// Dequeue 从队列中取出消息进行处理
func (q *RedisQueue) Dequeue(ctx context.Context) (*QueueMessage, error) {
// 使用Lua脚本保证操作的原子性,执行以下操作:
// 1. 使用RPOPLPUSH从主队列取出消息并放入处理中队列
// 2. 为取出的消息设置处理标记和过期时间
script := `
-- KEYS[1]: 主队列名称
-- KEYS[2]: 处理中队列名称
-- KEYS[3]: 处理标记前缀
-- ARGV[1]: 可见性超时时间(秒)
-- 原子性地从主队列尾部取出消息并添加到处理中队列头部
local msg = redis.call('RPOPLPUSH', KEYS[1], KEYS[2])
-- 如果成功获取到消息
if msg then
-- 设置处理标记,使用SETEX命令设置键值并指定过期时间
-- 格式:processingMarker + 消息内容 作为key,"1"为值
redis.call('SETEX', KEYS[3]..msg, ARGV[1], '1')
end
-- 返回消息内容(可能为nil)
return msg
`
// 执行Lua脚本,传入参数:
// 1. 上下文ctx
// 2. Lua脚本内容
// 3. KEYS数组参数:主队列名、处理中队列名、处理标记前缀
// 4. ARGV数组参数:可见性超时时间(秒)
result, err := q.client.Eval(ctx, script,
[]string{q.state.MainQueue, q.state.ProcessingQueue, q.state.ProcessingMarker},
q.config.VisibilityTimeout.Seconds()).Result()
if err != nil {
// 如果执行出错,返回错误信息
// 特殊处理空队列情况(redis.Nil错误)
if err == redis.Nil {
return nil, nil
}
return nil, fmt.Errorf("failed to execute dequeue script: %w", err)
}
// 如果结果为nil(队列为空),直接返回
if result == nil {
return nil, nil
}
// 将Redis返回的字符串结果断言为string类型
msgStr, ok := result.(string)
if !ok {
// 类型断言失败处理
return nil, fmt.Errorf("unexpected message type: %T", result)
}
// 反序列化消息内容为QueueMessage对象
msg, err := deserializeMessage(msgStr)
if err != nil {
return nil, fmt.Errorf("message deserialization failed: %w", err)
}
// 返回反序列化后的消息对象
return msg, nil
}
// Ack 确认消息处理完成
// ctx: 上下文对象,用于控制请求的生命周期(超时、取消等)
// msgID: 要确认的消息ID
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) Ack(ctx context.Context, msgID string) error {
// 使用Redis Pipeline批量执行以下原子操作:
// 1. 从处理中队列移除该消息
// 2. 删除该消息的处理标记
// 3. 减少队列长度计数器
// 创建Pipeline对象,用于批量执行Redis命令
pipe := q.client.Pipeline()
// 命令1: 从处理中队列移除指定消息
// LRem命令参数:
// - key: 处理中队列名称
// - count: 1表示只移除第一个匹配的元素
// - value: 要移除的消息ID
pipe.LRem(ctx, q.state.ProcessingQueue, 1, msgID)
// 命令2: 删除该消息的处理标记
// Del命令删除以ProcessingMarker为前缀加上msgID构成的key
pipe.Del(ctx, q.state.ProcessingMarker+msgID)
// 命令3: 减少队列长度计数器
// Decr命令将计数器减1
pipe.Decr(ctx, q.metrics.QueueLengthKey)
// 执行Pipeline中的所有命令
// Exec返回命令执行结果和可能的错误
_, err := pipe.Exec(ctx)
if err != nil {
// 如果执行出错,返回包装后的错误信息
return fmt.Errorf("failed to execute ack pipeline: %w", err)
}
// 更新成功处理的消息计数器
// Processed是Prometheus Counter类型指标
q.metrics.Processed.Inc()
// 返回nil表示操作成功完成
return nil
}
// Retry 对处理失败的消息进行重试处理
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 需要重试的消息对象指针
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) Retry(ctx context.Context, msg *QueueMessage) error {
// 检查消息当前重试次数是否已达到配置的最大重试次数
// MaxRetries 是在队列配置中设定的最大允许重试次数
if msg.Attempts >= q.config.MaxRetries {
// 如果已达到最大重试次数,则将消息转移到死信队列(DLQ)
// moveToDLQ 是内部方法,负责将消息从处理队列移动到死信队列
return q.moveToDLQ(ctx, msg)
}
// 更新消息的重试相关属性:
// 1. 增加重试计数器 Attempts
// 2. 更新时间戳为当前时间
msg.Attempts++ // 递增重试次数计数器
msg.Timestamp = time.Now() // 更新时间戳为当前时间
// 调用Enqueue方法将消息重新放回主队列头部
// 使用LPUSH操作保证重试的消息能够优先被再次处理
// Enqueue方法会处理序列化和监控指标更新等逻辑
return q.Enqueue(ctx, msg)
}
// recoverStuckMessages 恢复处理中队列里卡住的消息
// 该方法会定期执行,检查处理中超时的消息并重新入队
// ctx: 上下文对象,用于控制请求的生命周期(超时、取消等)
func (q *RedisQueue) recoverStuckMessages(ctx context.Context) {
// 使用LRange命令获取处理中队列的所有消息
// 参数说明:
// - key: 处理中队列名称
// - start: 0 表示从第一个元素开始
// - stop: -1 表示到最后一个元素
// 忽略错误是为了保证恢复流程不被单个错误中断
msgs, _ := q.client.LRange(ctx, q.state.ProcessingQueue, 0, -1).Result()
// 遍历处理中队列里的所有消息
for _, msgID := range msgs {
// 检查该消息的处理标记是否还存在(是否已过期)
// 处理标记的key格式为:processingMarkerPrefix + msgID
exists, _ := q.client.Exists(ctx, q.state.ProcessingMarker+msgID).Result()
// exists == 0 表示处理标记已过期(不存在)
if exists == 0 {
// 将消息重新放回主队列头部,使用LPUSH保证优先处理
// 忽略错误是为了保证恢复流程继续执行
q.client.LPush(ctx, q.state.MainQueue, msgID)
// 从处理中队列移除该消息
// LRem参数说明:
// - key: 处理中队列名称
// - count: 1 表示只移除第一个匹配项
// - value: 要移除的消息ID
q.client.LRem(ctx, q.state.ProcessingQueue, 1, msgID)
}
}
}
// moveToDLQ 将消息移动到死信队列(DLQ)
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 要移动到死信队列的消息对象指针
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) moveToDLQ(ctx context.Context, msg *QueueMessage) error {
// 序列化消息对象为字节数组,便于Redis存储
// 忽略序列化错误,因为即使序列化失败我们也希望继续处理
data, _ := serializeMessage(msg)
// 使用Redis Pipeline批量执行以下原子操作:
// 1. 从处理中队列移除该消息
// 2. 将消息加入死信队列
// 3. 删除该消息的处理标记
// 创建Pipeline对象,用于批量执行Redis命令
pipe := q.client.Pipeline()
// 命令1: 从处理中队列移除该消息
// LRem命令参数:
// - key: 处理中队列名称
// - count: 1表示只移除第一个匹配的元素
// - value: 要移除的消息数据
pipe.LRem(ctx, q.state.ProcessingQueue, 1, data)
// 命令2: 将消息加入死信队列头部
// LPush命令将消息放入死信队列头部
pipe.LPush(ctx, q.state.DeadLetterQueue, data)
// 命令3: 删除该消息的处理标记
// Del命令删除以ProcessingMarker为前缀加上消息ID构成的key
pipe.Del(ctx, q.state.ProcessingMarker+msg.ID)
// 执行Pipeline中的所有命令
// Exec返回命令执行结果和可能的错误
_, err := pipe.Exec(ctx)
if err != nil {
// 如果执行出错,返回包装后的错误信息
return fmt.Errorf("failed to move message to DLQ: %w", err)
}
// 更新死信队列消息计数器
// DeadLetters是Prometheus Counter类型指标
q.metrics.DeadLetters.Inc()
// 返回nil表示操作成功完成
return nil
}
// ConsumeBatch 批量消费队列消息
// ctx: 上下文对象,用于传递取消信号和超时控制
// handler: 消息处理器,实现MessageHandler接口
// batchSize: 每批处理的消息数量
func (q *RedisQueue) ConsumeBatch(ctx context.Context, handler MessageHandler, batchSize int) {
// 使用无限循环持续消费消息,直到收到取消信号
for {
select {
// 检查上下文是否被取消
case <-ctx.Done():
// 收到取消信号,退出消费循环
return
// 默认情况下继续处理消息
default:
// 从队列中批量获取消息
// dequeueBatch是内部方法,负责从Redis获取一批消息
msgs := q.dequeueBatch(ctx, batchSize)
// 如果没有获取到消息
if len(msgs) == 0 {
// 短暂休眠避免空轮询消耗CPU
time.Sleep(100 * time.Millisecond)
// 跳过本次循环,继续尝试获取消息
continue
}
// 使用WaitGroup等待所有消息处理完成
var wg sync.WaitGroup
// 并行处理本批所有消息
for _, msg := range msgs {
// 为每个消息增加WaitGroup计数器
wg.Add(1)
// 为每个消息启动一个goroutine并行处理
go func(m *QueueMessage) {
// 确保在goroutine结束时减少WaitGroup计数器
defer wg.Done()
// 调用处理器处理消息
if err := handler.Process(ctx, m); err != nil {
// 处理失败,进行重试
// Retry方法会根据重试策略处理消息
q.Retry(ctx, m)
} else {
// 处理成功,确认消息
// Ack方法会从处理中队列移除消息
q.Ack(ctx, m.ID)
}
}(msg) // 注意:这里将msg作为参数传入闭包,避免循环变量捕获问题
}
// 等待本批所有消息处理完成
wg.Wait()
}
}
}
// MessageHandler 定义消息处理器的接口
// 任何实现了Process方法的类型都可以作为消息处理器
type MessageHandler interface {
// Process 处理队列中的消息
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 要处理的消息对象指针
// 返回值:
// - nil 表示处理成功
// - 非nil 表示处理失败,需要重试
Process(ctx context.Context, msg *QueueMessage) error
}
// ExampleHandler 是MessageHandler接口的一个示例实现
// 包含处理消息所需的依赖资源
type ExampleHandler struct {
// 这里可以定义处理器依赖的资源
// 例如: 数据库连接、HTTP客户端、配置参数等
// db *sql.DB // 数据库连接示例
// client *http.Client // HTTP客户端示例
// config Config // 配置参数示例
}
// Process 实现MessageHandler接口的处理方法
// ctx: 上下文对象,用于控制请求的生命周期
// msg: 要处理的消息对象指针
// 返回值:
// - nil 表示处理成功,消息将被确认(ACK)
// - 非nil 表示处理失败,消息将根据重试策略处理
func (h *ExampleHandler) Process(ctx context.Context, msg *QueueMessage) error {
// 在这里实现具体的消息处理逻辑
// 示例步骤:
// 1. 反序列化消息体
// 2. 执行业务逻辑(如更新数据库、调用API等)
// 3. 处理结果判断
// 示例处理流程:
// var payload MyPayload
// if err := json.Unmarshal(msg.Body, &payload); err != nil {
// return fmt.Errorf("failed to unmarshal payload: %w", err)
// }
//
// if err := h.db.ExecContext(ctx, "UPDATE...", payload.Data); err != nil {
// return fmt.Errorf("db operation failed: %w", err)
// }
//
// if err := h.client.PostJSON(ctx, "/api/update", payload); err != nil {
// return fmt.Errorf("API call failed: %w", err)
// }
// 返回nil表示处理成功
return nil
// 如果需要重试,可以返回错误:
// return fmt.Errorf("temporary error, should retry")
// 对于不可恢复的错误,可以使用特殊错误类型:
// return PermanentError("invalid message format")
}
type QueueMetrics struct {
Enqueued prometheus.Counter // 入队总数
Dequeued prometheus.Counter // 出队总数
Processed prometheus.Counter // 处理成功数
Retries prometheus.Counter // 重试次数
DeadLetters prometheus.Counter // 死信数量
QueueLength prometheus.Gauge // 当前队列长度
ProcessTime prometheus.Histogram // 处理耗时分布
Lag prometheus.Gauge // 消费延迟
}
// 注册指标
func (m *QueueMetrics) Register() {
prometheus.MustRegister(
m.Enqueued,
m.Dequeued,
// ...其他指标
)
}
// 在关键操作点更新指标
func (q *RedisQueue) processWithMetrics(ctx context.Context, msg *QueueMessage, handler MessageHandler) {
start := time.Now()
err := handler.Process(ctx, msg)
// 记录处理时间
q.metrics.ProcessTime.Observe(time.Since(start).Seconds())
if err != nil {
q.metrics.Retries.Inc()
q.Retry(ctx, msg)
} else {
q.metrics.Processed.Inc()
q.Ack(ctx, msg.ID)
}
}
// PriorityQueue 定义了一个支持优先级的消息队列结构
// 使用多个Redis List分别存储不同优先级的消息
type PriorityQueue struct {
// HighPriority 高优先级队列的Redis key名称
// 用于存储需要优先处理的重要消息
HighPriority string
// NormalPriority 普通优先级队列的Redis key名称
// 用于存储一般重要的消息
NormalPriority string
// LowPriority 低优先级队列的Redis key名称
// 用于存储可以延后处理的普通消息
LowPriority string
// client Redis客户端实例,用于执行Redis命令
// 这里应该有一个redis.Client字段,原代码中缺失了
client *redis.Client
}
// Enqueue 将消息按照优先级加入到对应的队列中
// priority: 消息的优先级等级,整数类型
// - >=2 表示高优先级
// - ==1 表示普通优先级
// - <=0 表示低优先级
// msg: 要入队的消息对象指针
// ctx: 上下文对象,用于控制请求的生命周期(原代码中缺失此参数)
func (q *PriorityQueue) Enqueue(ctx context.Context, priority int, msg *Message) error {
// 使用switch语句根据优先级选择目标队列
switch {
// 优先级大于等于2,放入高优先级队列
case priority >= 2:
// 使用LPUSH命令将消息放入队列头部
// 保证高优先级消息能够被优先消费
if err := q.client.LPush(ctx, q.HighPriority, msg).Err(); err != nil {
return fmt.Errorf("failed to enqueue high priority message: %w", err)
}
// 优先级等于1,放入普通优先级队列
case priority == 1:
// 使用LPUSH命令将消息放入队列头部
if err := q.client.LPush(ctx, q.NormalPriority, msg).Err(); err != nil {
return fmt.Errorf("failed to enqueue normal priority message: %w", err)
}
// 其他情况(优先级<=0),放入低优先级队列
default:
// 使用LPUSH命令将消息放入队列头部
if err := q.client.LPush(ctx, q.LowPriority, msg).Err(); err != nil {
return fmt.Errorf("failed to enqueue low priority message: %w", err)
}
}
// 消息入队成功,返回nil
return nil
}
// Schedule 将消息加入延迟队列,在指定延迟时间后才会被消费
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 要延迟的消息对象
// delay: 延迟的时间长度
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) Schedule(ctx context.Context, msg *Message, delay time.Duration) error {
// 计算消息应该被执行的具体时间点
// 当前时间加上延迟时间,转换为Unix时间戳(秒级精度)
executeAt := time.Now().Add(delay).Unix()
// 将消息添加到Redis的有序集合(Sorted Set)中:
// - 有序集合的key为"delayed_queue"
// - 使用执行时间戳作为score(排序分数)
// - 消息ID作为member(集合成员)
// ZAdd命令返回操作结果和可能的错误
return q.client.ZAdd(ctx, "delayed_queue", &redis.Z{
Score: float64(executeAt), // 转换为float64类型作为排序分数
Member: msg.ID, // 消息唯一标识作为集合成员
}).Err() // 提取错误信息返回
}
// checkDelayed 检查延迟队列中已到期的消息,并将其移入主队列
// ctx: 上下文对象,用于传递截止时间和取消信号
func (q *RedisQueue) checkDelayed(ctx context.Context) error {
// 获取当前时间的Unix时间戳(秒级精度)
now := time.Now().Unix()
// 使用ZRangeByScore查询所有已到期的消息:
// - 查询score从0到当前时间戳的消息(即应该已经到期的消息)
// - Min:"0"表示最小score为0
// - Max:当前时间戳转换为字符串
// 忽略错误是为了保证流程继续执行
msgs, _ := q.client.ZRangeByScore(ctx, "delayed_queue", &redis.ZRangeBy{
Min: "0", // 最小score
Max: strconv.FormatInt(now, 10), // 最大score(当前时间)
}).Result() // 获取字符串形式的结果
// 使用Pipeline批量处理到期消息
pipe := q.client.Pipeline()
for _, msgID := range msgs {
// 将到期的消息LPUSH到主队列头部,使其能够被尽快消费
pipe.LPush(ctx, q.state.MainQueue, msgID)
// 从延迟队列中移除该消息
pipe.ZRem(ctx, "delayed_queue", msgID)
}
// 执行Pipeline中的所有命令
// 返回可能出现的错误
_, err := pipe.Exec(ctx)
return err
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。