首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于Redis实现生产级消息队列

基于Redis实现生产级消息队列

原创
作者头像
GeekLiHua
发布2025-07-06 17:53:56
发布2025-07-06 17:53:56
2120
举报
文章被收录于专栏:高并发高并发

基于Redis实现生产级消息队列

一、设计背景与目标

本文介绍一种基于Redis实现的生产级消息队列设计方案,该方案在保证可靠性的同时,兼顾了性能和易用性。

核心设计目标

  1. 高可靠性:确保消息不丢失、不重复消费
  2. 完备的可观测性:完善的监控指标和日志体系
  3. 生产级特性:连接池优化、超时控制、自动恢复
  4. 轻量级部署:无需引入额外中间件

二、整体架构设计

1. 多队列协同架构

代码语言:go
复制
  +----------------+     +-------------------+     +-------------+
  |   主队列       |     |   处理中队列      |     |  死信队列   |
  | (queueName)    |<--->| (processingName)  |---->| (dlqName)   |
  +----------------+     +-------------------+     +-------------+
        ^                      ^
        |                      |
        | 消息入队             | 消息处理
        |                      |
  +-----+----------------------+-----+
  |           生产者                |
  +---------------------------------+
               |
               v
  +---------------------------------+
  |           消费者                |
  +---------------------------------+

2. 消息状态流转

代码语言:go
复制
新消息 
  -> 入队到主队列 
  -> 消费者取出移至处理中队列 
  -> 处理成功则删除 
  -> 处理失败根据重试策略决定重新入队或进入死信队列

三、核心代码设计思路

一、基础架构设计

1. 多队列状态管理
代码语言:go
复制
// 队列状态定义
type QueueState struct {
    MainQueue      string // 主队列: "queue:{name}"
    ProcessingQueue string // 处理中队列: "queue:{name}:processing" 
    DeadLetterQueue string // 死信队列: "queue:{name}:dlq"
    ProcessingMarker string // 处理标记前缀: "queue:{name}:marker:"
}

设计要点

  • 使用Redis List实现主队列和处理中队列
  • 死信队列存储彻底失败的消息
  • 处理标记使用Redis String存储,设置TTL
2. 核心数据结构
代码语言:go
复制
// 消息实体
type QueueMessage struct {
    ID        string                 // 消息唯一ID
    Body      []byte                 // 消息体
    Attempts  int                    // 已尝试次数
    Timestamp time.Time              // 入队时间
}

// 队列配置
type QueueConfig struct {
    Name            string
    MaxRetries      int           // 最大重试次数
    VisibilityTimeout time.Duration // 消息可见超时
    ProcessTimeout  time.Duration // 处理超时
}

二、关键操作设计

1. 消息生产设计
代码语言:go
复制
// Enqueue 将消息加入队列
func (q *RedisQueue) Enqueue(ctx context.Context, msg *QueueMessage) error {
    // 序列化消息对象为字节数组,便于Redis存储
    // serializeMessage 是自定义的序列化方法,可以使用JSON、Protobuf等格式
    data, err := serializeMessage(msg)
    if err != nil {
        // 如果序列化失败,返回序列化错误
        return fmt.Errorf("message serialization failed: %w", err)
    }

    // 使用Redis Pipeline批量执行以下操作,减少网络往返次数:
    // 1. 将消息LPUSH到主队列头部(保证新消息优先处理)
    // 2. INCR队列长度计数器(用于监控队列长度)
    pipe := q.client.Pipeline()
    // LPUSH命令将消息放入队列头部
    pipe.LPush(ctx, q.state.MainQueue, data)
    // INCR命令原子性增加队列长度计数
    pipe.Incr(ctx, q.metrics.QueueLengthKey)

    // 执行Pipeline中的所有命令
    // Exec返回命令执行结果和可能的错误
    _, err = pipe.Exec(ctx)
    if err != nil {
        // 如果执行失败,返回Redis操作错误
        return fmt.Errorf("redis pipeline execution failed: %w", err)
    }

    // 成功入队后,增加入队计数器指标
    // Enqueued是Prometheus Counter类型指标
    q.metrics.Enqueued.Inc()

    // 返回nil表示操作成功完成
    return nil
}
2. 消息消费设计
代码语言:go
复制
// Dequeue 从队列中取出消息进行处理
func (q *RedisQueue) Dequeue(ctx context.Context) (*QueueMessage, error) {
    // 使用Lua脚本保证操作的原子性,执行以下操作:
    // 1. 使用RPOPLPUSH从主队列取出消息并放入处理中队列
    // 2. 为取出的消息设置处理标记和过期时间
    script := `
        -- KEYS[1]: 主队列名称
        -- KEYS[2]: 处理中队列名称 
        -- KEYS[3]: 处理标记前缀
        -- ARGV[1]: 可见性超时时间(秒)
        
        -- 原子性地从主队列尾部取出消息并添加到处理中队列头部
        local msg = redis.call('RPOPLPUSH', KEYS[1], KEYS[2])
        
        -- 如果成功获取到消息
        if msg then
            -- 设置处理标记,使用SETEX命令设置键值并指定过期时间
            -- 格式:processingMarker + 消息内容 作为key,"1"为值
            redis.call('SETEX', KEYS[3]..msg, ARGV[1], '1')
        end
        
        -- 返回消息内容(可能为nil)
        return msg
    `
    
    // 执行Lua脚本,传入参数:
    // 1. 上下文ctx
    // 2. Lua脚本内容
    // 3. KEYS数组参数:主队列名、处理中队列名、处理标记前缀
    // 4. ARGV数组参数:可见性超时时间(秒)
    result, err := q.client.Eval(ctx, script, 
        []string{q.state.MainQueue, q.state.ProcessingQueue, q.state.ProcessingMarker},
        q.config.VisibilityTimeout.Seconds()).Result()
    
    if err != nil {
        // 如果执行出错,返回错误信息
        // 特殊处理空队列情况(redis.Nil错误)
        if err == redis.Nil {
            return nil, nil
        }
        return nil, fmt.Errorf("failed to execute dequeue script: %w", err)
    }
    
    // 如果结果为nil(队列为空),直接返回
    if result == nil {
        return nil, nil
    }
    
    // 将Redis返回的字符串结果断言为string类型
    msgStr, ok := result.(string)
    if !ok {
        // 类型断言失败处理
        return nil, fmt.Errorf("unexpected message type: %T", result)
    }
    
    // 反序列化消息内容为QueueMessage对象
    msg, err := deserializeMessage(msgStr)
    if err != nil {
        return nil, fmt.Errorf("message deserialization failed: %w", err)
    }
    
    // 返回反序列化后的消息对象
    return msg, nil
}
3. 消息确认设计
代码语言:go
复制
// Ack 确认消息处理完成
// ctx: 上下文对象,用于控制请求的生命周期(超时、取消等)
// msgID: 要确认的消息ID
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) Ack(ctx context.Context, msgID string) error {
    // 使用Redis Pipeline批量执行以下原子操作:
    // 1. 从处理中队列移除该消息
    // 2. 删除该消息的处理标记
    // 3. 减少队列长度计数器
    
    // 创建Pipeline对象,用于批量执行Redis命令
    pipe := q.client.Pipeline()
    
    // 命令1: 从处理中队列移除指定消息
    // LRem命令参数:
    // - key: 处理中队列名称
    // - count: 1表示只移除第一个匹配的元素
    // - value: 要移除的消息ID
    pipe.LRem(ctx, q.state.ProcessingQueue, 1, msgID)
    
    // 命令2: 删除该消息的处理标记
    // Del命令删除以ProcessingMarker为前缀加上msgID构成的key
    pipe.Del(ctx, q.state.ProcessingMarker+msgID)
    
    // 命令3: 减少队列长度计数器
    // Decr命令将计数器减1
    pipe.Decr(ctx, q.metrics.QueueLengthKey)
    
    // 执行Pipeline中的所有命令
    // Exec返回命令执行结果和可能的错误
    _, err := pipe.Exec(ctx)
    if err != nil {
        // 如果执行出错,返回包装后的错误信息
        return fmt.Errorf("failed to execute ack pipeline: %w", err)
    }
    
    // 更新成功处理的消息计数器
    // Processed是Prometheus Counter类型指标
    q.metrics.Processed.Inc()
    
    // 返回nil表示操作成功完成
    return nil
}
4. 消息重试设计
代码语言:go
复制
// Retry 对处理失败的消息进行重试处理
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 需要重试的消息对象指针
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) Retry(ctx context.Context, msg *QueueMessage) error {
    // 检查消息当前重试次数是否已达到配置的最大重试次数
    // MaxRetries 是在队列配置中设定的最大允许重试次数
    if msg.Attempts >= q.config.MaxRetries {
        // 如果已达到最大重试次数,则将消息转移到死信队列(DLQ)
        // moveToDLQ 是内部方法,负责将消息从处理队列移动到死信队列
        return q.moveToDLQ(ctx, msg)
    }
    
    // 更新消息的重试相关属性:
    // 1. 增加重试计数器 Attempts
    // 2. 更新时间戳为当前时间
    msg.Attempts++  // 递增重试次数计数器
    msg.Timestamp = time.Now()  // 更新时间戳为当前时间
    
    // 调用Enqueue方法将消息重新放回主队列头部
    // 使用LPUSH操作保证重试的消息能够优先被再次处理
    // Enqueue方法会处理序列化和监控指标更新等逻辑
    return q.Enqueue(ctx, msg)
}

三、可靠性保障设计

1. 消息恢复机制
代码语言:go
复制
// recoverStuckMessages 恢复处理中队列里卡住的消息
// 该方法会定期执行,检查处理中超时的消息并重新入队
// ctx: 上下文对象,用于控制请求的生命周期(超时、取消等)
func (q *RedisQueue) recoverStuckMessages(ctx context.Context) {
    // 使用LRange命令获取处理中队列的所有消息
    // 参数说明:
    // - key: 处理中队列名称
    // - start: 0 表示从第一个元素开始
    // - stop: -1 表示到最后一个元素
    // 忽略错误是为了保证恢复流程不被单个错误中断
    msgs, _ := q.client.LRange(ctx, q.state.ProcessingQueue, 0, -1).Result()
    
    // 遍历处理中队列里的所有消息
    for _, msgID := range msgs {
        // 检查该消息的处理标记是否还存在(是否已过期)
        // 处理标记的key格式为:processingMarkerPrefix + msgID
        exists, _ := q.client.Exists(ctx, q.state.ProcessingMarker+msgID).Result()
        
        // exists == 0 表示处理标记已过期(不存在)
        if exists == 0 {
            // 将消息重新放回主队列头部,使用LPUSH保证优先处理
            // 忽略错误是为了保证恢复流程继续执行
            q.client.LPush(ctx, q.state.MainQueue, msgID)
            
            // 从处理中队列移除该消息
            // LRem参数说明:
            // - key: 处理中队列名称
            // - count: 1 表示只移除第一个匹配项
            // - value: 要移除的消息ID
            q.client.LRem(ctx, q.state.ProcessingQueue, 1, msgID)
        }
    }
}
2. 死信队列处理
代码语言:go
复制
// moveToDLQ 将消息移动到死信队列(DLQ)
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 要移动到死信队列的消息对象指针
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) moveToDLQ(ctx context.Context, msg *QueueMessage) error {
    // 序列化消息对象为字节数组,便于Redis存储
    // 忽略序列化错误,因为即使序列化失败我们也希望继续处理
    data, _ := serializeMessage(msg)

    // 使用Redis Pipeline批量执行以下原子操作:
    // 1. 从处理中队列移除该消息
    // 2. 将消息加入死信队列
    // 3. 删除该消息的处理标记

    // 创建Pipeline对象,用于批量执行Redis命令
    pipe := q.client.Pipeline()

    // 命令1: 从处理中队列移除该消息
    // LRem命令参数:
    // - key: 处理中队列名称
    // - count: 1表示只移除第一个匹配的元素
    // - value: 要移除的消息数据
    pipe.LRem(ctx, q.state.ProcessingQueue, 1, data)

    // 命令2: 将消息加入死信队列头部
    // LPush命令将消息放入死信队列头部
    pipe.LPush(ctx, q.state.DeadLetterQueue, data)

    // 命令3: 删除该消息的处理标记
    // Del命令删除以ProcessingMarker为前缀加上消息ID构成的key
    pipe.Del(ctx, q.state.ProcessingMarker+msg.ID)

    // 执行Pipeline中的所有命令
    // Exec返回命令执行结果和可能的错误
    _, err := pipe.Exec(ctx)
    if err != nil {
        // 如果执行出错,返回包装后的错误信息
        return fmt.Errorf("failed to move message to DLQ: %w", err)
    }

    // 更新死信队列消息计数器
    // DeadLetters是Prometheus Counter类型指标
    q.metrics.DeadLetters.Inc()

    // 返回nil表示操作成功完成
    return nil
}

四、消费者模式设计

1. 批量消费者实现
代码语言:go
复制
// ConsumeBatch 批量消费队列消息
// ctx: 上下文对象,用于传递取消信号和超时控制
// handler: 消息处理器,实现MessageHandler接口
// batchSize: 每批处理的消息数量
func (q *RedisQueue) ConsumeBatch(ctx context.Context, handler MessageHandler, batchSize int) {
    // 使用无限循环持续消费消息,直到收到取消信号
    for {
        select {
        // 检查上下文是否被取消
        case <-ctx.Done():
            // 收到取消信号,退出消费循环
            return
        
        // 默认情况下继续处理消息
        default:
            // 从队列中批量获取消息
            // dequeueBatch是内部方法,负责从Redis获取一批消息
            msgs := q.dequeueBatch(ctx, batchSize)
            
            // 如果没有获取到消息
            if len(msgs) == 0 {
                // 短暂休眠避免空轮询消耗CPU
                time.Sleep(100 * time.Millisecond)
                // 跳过本次循环,继续尝试获取消息
                continue
            }
            
            // 使用WaitGroup等待所有消息处理完成
            var wg sync.WaitGroup
            
            // 并行处理本批所有消息
            for _, msg := range msgs {
                // 为每个消息增加WaitGroup计数器
                wg.Add(1)
                
                // 为每个消息启动一个goroutine并行处理
                go func(m *QueueMessage) {
                    // 确保在goroutine结束时减少WaitGroup计数器
                    defer wg.Done()
                    
                    // 调用处理器处理消息
                    if err := handler.Process(ctx, m); err != nil {
                        // 处理失败,进行重试
                        // Retry方法会根据重试策略处理消息
                        q.Retry(ctx, m)
                    } else {
                        // 处理成功,确认消息
                        // Ack方法会从处理中队列移除消息
                        q.Ack(ctx, m.ID)
                    }
                }(msg) // 注意:这里将msg作为参数传入闭包,避免循环变量捕获问题
            }
            
            // 等待本批所有消息处理完成
            wg.Wait()
        }
    }
}
2. 消息处理接口
代码语言:go
复制
// MessageHandler 定义消息处理器的接口
// 任何实现了Process方法的类型都可以作为消息处理器
type MessageHandler interface {
    // Process 处理队列中的消息
    // ctx: 上下文对象,用于传递截止时间和取消信号
    // msg: 要处理的消息对象指针
    // 返回值: 
    //   - nil 表示处理成功
    //   - 非nil 表示处理失败,需要重试
    Process(ctx context.Context, msg *QueueMessage) error
}

// ExampleHandler 是MessageHandler接口的一个示例实现
// 包含处理消息所需的依赖资源
type ExampleHandler struct {
    // 这里可以定义处理器依赖的资源
    // 例如: 数据库连接、HTTP客户端、配置参数等
    
    // db *sql.DB       // 数据库连接示例
    // client *http.Client // HTTP客户端示例
    // config Config    // 配置参数示例
}

// Process 实现MessageHandler接口的处理方法
// ctx: 上下文对象,用于控制请求的生命周期
// msg: 要处理的消息对象指针
// 返回值: 
//   - nil 表示处理成功,消息将被确认(ACK)
//   - 非nil 表示处理失败,消息将根据重试策略处理
func (h *ExampleHandler) Process(ctx context.Context, msg *QueueMessage) error {
    // 在这里实现具体的消息处理逻辑
    // 示例步骤:
    // 1. 反序列化消息体
    // 2. 执行业务逻辑(如更新数据库、调用API等)
    // 3. 处理结果判断
    
    // 示例处理流程:
    // var payload MyPayload
    // if err := json.Unmarshal(msg.Body, &payload); err != nil {
    //     return fmt.Errorf("failed to unmarshal payload: %w", err)
    // }
    //
    // if err := h.db.ExecContext(ctx, "UPDATE...", payload.Data); err != nil {
    //     return fmt.Errorf("db operation failed: %w", err)
    // }
    //
    // if err := h.client.PostJSON(ctx, "/api/update", payload); err != nil {
    //     return fmt.Errorf("API call failed: %w", err)
    // }
    
    // 返回nil表示处理成功
    return nil
    
    // 如果需要重试,可以返回错误:
    // return fmt.Errorf("temporary error, should retry")
    
    // 对于不可恢复的错误,可以使用特殊错误类型:
    // return PermanentError("invalid message format")
}

五、监控指标设计

1. 核心指标定义
代码语言:go
复制
type QueueMetrics struct {
    Enqueued      prometheus.Counter   // 入队总数
    Dequeued      prometheus.Counter   // 出队总数
    Processed     prometheus.Counter   // 处理成功数
    Retries       prometheus.Counter   // 重试次数
    DeadLetters   prometheus.Counter   // 死信数量
    QueueLength   prometheus.Gauge     // 当前队列长度
    ProcessTime   prometheus.Histogram // 处理耗时分布
    Lag           prometheus.Gauge     // 消费延迟
}

// 注册指标
func (m *QueueMetrics) Register() {
    prometheus.MustRegister(
        m.Enqueued,
        m.Dequeued,
        // ...其他指标
    )
}
2. 指标更新策略
代码语言:go
复制
// 在关键操作点更新指标
func (q *RedisQueue) processWithMetrics(ctx context.Context, msg *QueueMessage, handler MessageHandler) {
    start := time.Now()
    
    err := handler.Process(ctx, msg)
    
    // 记录处理时间
    q.metrics.ProcessTime.Observe(time.Since(start).Seconds())
    
    if err != nil {
        q.metrics.Retries.Inc()
        q.Retry(ctx, msg)
    } else {
        q.metrics.Processed.Inc()
        q.Ack(ctx, msg.ID)
    }
}

六、扩展性设计

1. 优先级队列支持
代码语言:go
复制
// PriorityQueue 定义了一个支持优先级的消息队列结构
// 使用多个Redis List分别存储不同优先级的消息
type PriorityQueue struct {
    // HighPriority 高优先级队列的Redis key名称
    // 用于存储需要优先处理的重要消息
    HighPriority string
    
    // NormalPriority 普通优先级队列的Redis key名称
    // 用于存储一般重要的消息
    NormalPriority string
    
    // LowPriority 低优先级队列的Redis key名称  
    // 用于存储可以延后处理的普通消息
    LowPriority string
    
    // client Redis客户端实例,用于执行Redis命令
    // 这里应该有一个redis.Client字段,原代码中缺失了
    client *redis.Client
}

// Enqueue 将消息按照优先级加入到对应的队列中
// priority: 消息的优先级等级,整数类型
//   - >=2 表示高优先级
//   - ==1 表示普通优先级
//   - <=0 表示低优先级
// msg: 要入队的消息对象指针
// ctx: 上下文对象,用于控制请求的生命周期(原代码中缺失此参数)
func (q *PriorityQueue) Enqueue(ctx context.Context, priority int, msg *Message) error {
    // 使用switch语句根据优先级选择目标队列
    switch {
    // 优先级大于等于2,放入高优先级队列
    case priority >= 2:
        // 使用LPUSH命令将消息放入队列头部
        // 保证高优先级消息能够被优先消费
        if err := q.client.LPush(ctx, q.HighPriority, msg).Err(); err != nil {
            return fmt.Errorf("failed to enqueue high priority message: %w", err)
        }
    
    // 优先级等于1,放入普通优先级队列    
    case priority == 1:
        // 使用LPUSH命令将消息放入队列头部
        if err := q.client.LPush(ctx, q.NormalPriority, msg).Err(); err != nil {
            return fmt.Errorf("failed to enqueue normal priority message: %w", err)
        }
    
    // 其他情况(优先级<=0),放入低优先级队列
    default:
        // 使用LPUSH命令将消息放入队列头部
        if err := q.client.LPush(ctx, q.LowPriority, msg).Err(); err != nil {
            return fmt.Errorf("failed to enqueue low priority message: %w", err)
        }
    }
    
    // 消息入队成功,返回nil
    return nil
}
2. 延迟队列实现
代码语言:go
复制
// Schedule 将消息加入延迟队列,在指定延迟时间后才会被消费
// ctx: 上下文对象,用于传递截止时间和取消信号
// msg: 要延迟的消息对象
// delay: 延迟的时间长度
// 返回值: 错误对象,nil表示操作成功
func (q *RedisQueue) Schedule(ctx context.Context, msg *Message, delay time.Duration) error {
    // 计算消息应该被执行的具体时间点
    // 当前时间加上延迟时间,转换为Unix时间戳(秒级精度)
    executeAt := time.Now().Add(delay).Unix()
    
    // 将消息添加到Redis的有序集合(Sorted Set)中:
    // - 有序集合的key为"delayed_queue"
    // - 使用执行时间戳作为score(排序分数)
    // - 消息ID作为member(集合成员)
    // ZAdd命令返回操作结果和可能的错误
    return q.client.ZAdd(ctx, "delayed_queue", &redis.Z{
        Score:  float64(executeAt),  // 转换为float64类型作为排序分数
        Member: msg.ID,             // 消息唯一标识作为集合成员
    }).Err()  // 提取错误信息返回
}

// checkDelayed 检查延迟队列中已到期的消息,并将其移入主队列
// ctx: 上下文对象,用于传递截止时间和取消信号
func (q *RedisQueue) checkDelayed(ctx context.Context) error {
    // 获取当前时间的Unix时间戳(秒级精度)
    now := time.Now().Unix()
    
    // 使用ZRangeByScore查询所有已到期的消息:
    // - 查询score从0到当前时间戳的消息(即应该已经到期的消息)
    // - Min:"0"表示最小score为0
    // - Max:当前时间戳转换为字符串
    // 忽略错误是为了保证流程继续执行
    msgs, _ := q.client.ZRangeByScore(ctx, "delayed_queue", &redis.ZRangeBy{
        Min: "0",  // 最小score
        Max: strconv.FormatInt(now, 10),  // 最大score(当前时间)
    }).Result()  // 获取字符串形式的结果
    
    // 使用Pipeline批量处理到期消息
    pipe := q.client.Pipeline()
    for _, msgID := range msgs {
        // 将到期的消息LPUSH到主队列头部,使其能够被尽快消费
        pipe.LPush(ctx, q.state.MainQueue, msgID)
        
        // 从延迟队列中移除该消息
        pipe.ZRem(ctx, "delayed_queue", msgID)
    }
    
    // 执行Pipeline中的所有命令
    // 返回可能出现的错误
    _, err := pipe.Exec(ctx)
    return err
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于Redis实现生产级消息队列
    • 一、设计背景与目标
    • 二、整体架构设计
      • 1. 多队列协同架构
      • 2. 消息状态流转
    • 三、核心代码设计思路
      • 一、基础架构设计
      • 二、关键操作设计
      • 三、可靠性保障设计
      • 四、消费者模式设计
      • 五、监控指标设计
      • 六、扩展性设计
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档