首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >AI编程0X03〡Claude Code 编写 Redis Stream 轻量级消息队列

AI编程0X03〡Claude Code 编写 Redis Stream 轻量级消息队列

作者头像
Tinywan
发布2025-10-20 17:19:36
发布2025-10-20 17:19:36
6500
代码可运行
举报
文章被收录于专栏:开源技术小栈开源技术小栈
运行总次数:0
代码可运行

🚀 基于 Redis Streams 的高性能轻量级 PHP 队列

官方地址:https://github.tinywan.com/redis-stream

✨ 特性

  • 高性能 - 基于 Redis 5.0+ Stream,支持高并发
  • 延时任务 - 基于 Sorted Set,支持秒级到年级延时
  • 🔄 多生产者/消费者 - 支持多个生产者和消费者同时工作
  • 💾 消息持久化 - 可靠的消息持久化存储
  • ACK 确认机制 - 完善的消息确认机制
  • 🔄 智能重试 - 可配置的重试次数和延迟策略
  • 🔄 消息重放 - 支持重新处理历史消息
  • 🔍 消息审计 - 提供只读模式审计所有消息
  • 🧪 完整测试 - 69个测试,244个断言,100%通过率
  • 📝 PSR-3 日志 - 标准 PSR-3 日志接口
  • 🏗️ 单例模式 - 单例模式支持,避免重复创建实例

📋 环境要求

  • PHP >= 7.4
  • Redis >= 5.0
  • Composer >= 2.0
  • ext-redis 扩展

🚀 快速安装

代码语言:javascript
代码运行次数:0
运行
复制
composer require tinywan/redis-stream

🎯 快速开始

基础使用

创建队列实例

代码语言:javascript
代码运行次数:0
运行
复制
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Tinywan\RedisStream\RedisStreamQueue;

$queue = RedisStreamQueue::getInstance();

发送消息

代码语言:javascript
代码运行次数:0
运行
复制
<?php
$messageId = $queue->send('Hello, Redis Stream!');
echo "Message ID: $messageId\n";

消费消息

代码语言:javascript
代码运行次数:0
运行
复制
<?php
// 消费消息
$message = $queue->consume(function($message) {
    echo "Processing: " . $message['message'] . "\n";
    return true; // 确认消息
});

使用 Producer 和 Consumer

代码语言:javascript
代码运行次数:0
运行
复制
use Tinywan\RedisStream\RedisStreamQueue;
useTinywan\RedisStream\Producer;
useTinywan\RedisStream\Consumer;

// 创建队列实例
$queue = RedisStreamQueue::getInstance();

// 生产者
$producer = new Producer($queue);
$messageId = $producer->send('Task data', [
    'task_type' => 'email'
], 10); // 延迟10秒

// 消费者
$consumer = new Consumer($queue);
$consumer->run(function($message) {
    $task = json_decode($message['message'], true);
    return handleTask($task['type'], $task['data']);
});

📖 主要功能

延时消息

支持秒级到年级的任意时长延时:

代码语言:javascript
代码运行次数:0
运行
复制
// 立即执行
$queue->send('Immediate message');

// 延时执行(30秒后)
$queue->send('Delayed message', [], 30);

// 定时执行(1小时后)
$timestamp = time() + 3600;
$queue->send('Scheduled message', [], $timestamp);

// 年级延时(1天后)
$queue->send('Next day message', [], 86400);

消息重放与审计

支持重新处理历史消息和只读审计:

代码语言:javascript
代码运行次数:0
运行
复制
// 重放消息,最多处理10条,自动确认
$count = $queue->replayMessages(function($message) {
    echo "Replaying: " . $message['message'] . "\n";
    return true;
}, 10);

// 审计消息(只读模式,不影响消息状态)
$count = $queue->auditMessages(function($message) {
    echo "Auditing: " . $message['message'] . "\n";
    return true;
}, 20);

指定位置消费

灵活的消费位置控制:

代码语言:javascript
代码运行次数:0
运行
复制
// 从头开始读取所有消息
$message = $queue->consume(null, '0-0');

// 读取最新消息
$message = $queue->consume(null, '$');

// 从指定消息ID开始读取
$message = $queue->consumeFrom('1758943564547-0');

⚙️ 配置

Redis 配置

代码语言:javascript
代码运行次数:0
运行
复制
$redisConfig = [
    'host' => '127.0.0.1',
    'port' => 6379,
    'password' => null,
    'database' => 0,
    'timeout' => 5,
];

队列配置

代码语言:javascript
代码运行次数:0
运行
复制
$queueConfig = [
    'stream_name' => 'redis_stream_queue',
    'consumer_group' => 'redis_stream_group',
    'consumer_name' => 'consumer_' . getmypid(),
    'block_timeout' => 5000,
    'retry_attempts' => 3,
    'retry_delay' => 1000,
    'delayed_queue_suffix' => '_delayed',
    'scheduler_interval' => 1,
];

🚀 生产部署

Supervisor 配置

代码语言:javascript
代码运行次数:0
运行
复制
[program:redis-stream-consumer]
command=php /path/to/your/project/examples/consumer.php
directory=/path/to/your/project
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/supervisor/redis-stream-consumer.log

Docker 部署

代码语言:javascript
代码运行次数:0
运行
复制
FROM php:8.1-cli
RUN pecl install redis && docker-php-ext-enable redis
COPY --from=composer:latest /usr/bin/composer /usr/bin/composer
COPY . /app
WORKDIR /app
RUN composer install --no-dev --optimize-autoloader
CMD ["php", "examples/consumer.php"]

🔧 高级功能

单例模式管理

代码语言:javascript
代码运行次数:0
运行
复制
// 获取实例状态
$status = RedisStreamQueue::getInstancesStatus();

// 获取连接池状态
$poolStatus = $queue->getConnectionPoolStatus();

延迟队列管理

代码语言:javascript
代码运行次数:0
运行
复制
// 获取延迟队列统计
$stats = $queue->getDelayedQueueStats();

// 手动运行调度器
$processedCount = $queue->runDelayedScheduler(100);

// 启动调度器(运行60秒)
$queue->startDelayedScheduler(60);

队列监控

代码语言:javascript
代码运行次数:0
运行
复制
// 获取队列状态
$status = [
    'stream_length' => $queue->getStreamLength(),
    'pending_count' => $queue->getPendingCount(),
    'delayed_count' => $queue->getDelayedQueueLength(),
];

🧪 运行示例

代码语言:javascript
代码运行次数:0
运行
复制
# 基础示例
php examples/quickstart.php

# 生产者示例
php examples/producer.php

# 消费者示例
php examples/consumer.php

# 运行测试
./vendor/bin/phpunit
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ✨ 特性
  • 📋 环境要求
  • 🚀 快速安装
  • 🎯 快速开始
    • 基础使用
    • 使用 Producer 和 Consumer
  • 📖 主要功能
    • 延时消息
    • 消息重放与审计
    • 指定位置消费
  • ⚙️ 配置
    • Redis 配置
    • 队列配置
  • 🚀 生产部署
    • Supervisor 配置
    • Docker 部署
  • 🔧 高级功能
    • 单例模式管理
    • 延迟队列管理
    • 队列监控
  • 🧪 运行示例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档