Redis5.0引入了Stream类型。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:
关于stream的具体介绍可以参见:
我们基于redis stream实现了一个基础的,类似beanstalk的队列服务。用于多个无差别的消费者从一个队列消费任务的情况。如果您对stream有所了解,那么我们其实是使用了stream+group当作了beanstalk的tube。
提供最基础的功能:
/*
* 向流中添加任务
* $data: 数组形式的任务数据
* return: 任务id
*/
addTask(array $data){}
/*
* 获取任务
* $block:阻塞时间,毫秒. null不阻塞
* $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block。
* $start:'>'表示接受最新数据. 若设置id,则读取大于该id,且未被确认(ack)的历史任务
* 普通使用时,只要设置$block即可。
*
* return [
* 'id1' => taskdata1,
* 'id2' => taskdata2,
* ... ...
* ]
*
* 无数据返回[]
*/
getTask($block=null, $count = 1, $start = '>'){}
/*
* 根据id确认任务完成并从stream中删除该任务
* $ids: 可以是单条taskid,也可以是数组形式的多条id
*
*
* 该方法其实完成了两个动作
* ack:确认任务完成
* del:stream中删除任务
* 所以返回值中包括两个值,第一个为ack是否成功,第二个为del是否成功
*/
delTask($ids){}
<?php
/*
* 需要redis-server5.0以上
* php-redis扩展版本要适配redis-5.0
*
* 使用redis stream仿照beanstalk封装的队列服务
*/
class RedisQueue{
protected $_mRedis = null;
protected $_mStream = '';
protected $_mGroup = '';
protected $_mConsumer = '';
//默认0 不限制长度
protected $_mMaxLength = 0;
/*
* 创建队列, stream+group确认唯一队列
* $config必须包括:
* stream: stream名
* server: 格式ip:port[:auth]
*
* 可选参数:
* maxLength:队列最大长度
* group:分组名, 默认与stream相同. stream+group相当于beanstalk的tube
* consumer:消费者名, 默认与stream相同.
* */
public function __construct(array $config){
if(!isset($config['stream'])){
throw new Exception("you must config the stream");
}
$this->_mStream = $config['stream'];
if(!isset($config['server'])){
throw new Exception("you must config the server");
}
$tmp = explode(':', $config['server']);
$host = $tmp[0];
$port = $tmp[1];
$auth = $tmp[2] ?? null;
if ($host && $port){
$this->_mRedis = new Redis();
$this->_mRedis->connect($host,$port,1);
if($auth){
$this->_mRedis->auth($auth);
}
}
else{
throw new Exception("can not get redis server conf");
}
if(isset($config['maxLength'])){
$this->_mMaxLength = $config['maxLength'];
}
$this->_mGroup = $config['group'] ?? $config['stream'];
$this->_mConsumer = $config['consumer'] ?? $config['stream'];
$this->creatGroup();
}
/*
* 删除当前流(队列)
* */
public function destoryStream(){
$this->_mRedis->del($this->_mStream);
}
/*
* 向流中添加任务
* $data: array
* return: taskid
* */
public function addTask(array $data){
return $this->_mRedis->xAdd($this->_mStream, "*", $data , $this->_mMaxLength);
}
/*
* 从group中获取任务
* $block:阻塞时间,毫秒. null不阻塞
* $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block
* $start:'>'接受最新数据. 若设置id,则读取大于该id,且未被ack的历史任务
*
* return [
* 'id1' => taskdata1,
* 'id2' => taskdata2,
* ... ...
* ]
*
* 无数据返回[]
* */
public function getTask($block=null, $count = 1, $start = '>'){
$d = $this->_mRedis->xReadGroup($this->_mGroup, $this->_mConsumer, [$this->_mStream => $start], $count, $block);
if (is_array($d) && count($d) > 0){
return $d[$this->_mStream];
}
return $d;
}
/*
* ack任务--从pending中删除
* 同时从stream中删除
*/
public function delTask($ids){
if(!is_array($ids)){
$ids = array($ids);
}
$multi = $this->_mRedis->multi(Redis::PIPELINE);
$multi->xAck($this->_mStream, $this->_mGroup, $ids);
$multi->xDel($this->_mStream, $ids);
$res = $this->_mRedis->exec();
return $res;
}
protected function creatGroup($startID = 0){
return $this->_mRedis->xGroup('CREATE', $this->_mStream, $this->_mGroup, $startID, true);
}
}
$config = [
'server' => '10.10.10.1:6379:auth',
'stream' => 'balltube',
'consumer' => 'normalprocessor'//可以不设置
];
//创建队列
$q = new RedisQueue($config);
//添加任务
$task = ['task'=>1];
$q->addTask($task);
//获取
$timeout = 1000;
$task = $q->getTask($timeout);
//确认并删除
$taskid = key($task);
$q->delTask($taskid);
当任务被取出且未被确认时,该任务处理pending状态。beanstalk中,对于这种任务可以设置一个超时时间timeout,当任务超过timeout未被确认,该任务会被还回队列中。对于stream,应该如何处理这种任务呢?请参见:
在任务大小为1k和10k的时候,开启不同个数的进程进行10000次读/写操作,测试结果如下:
进程数 | 10 | 20 | 50 |
---|---|---|---|
redis万次读 | 1.64928s | 0.864051s | 0.542352s |
beanstalk万次读 | 1.702436s | 0.915132s | 0.503198s |
redis万次写 | 3.328083s | 1.714555s | 0.837429s |
beanstalk次写 | 3.402431s | 1.702654s | 0.9317s |
进程数 | 10 | 20 | 50 |
---|---|---|---|
redis万次读 | 1.962591s | 1.569581s | 1.001159s |
beanstalk万次读 | 3.30333s | 1.72248s | 0.940097s |
redis万次写 | 3.360724s | 1.77125s | 0.921126s |
beanstalk次写 | 3.418932s | 1.766198s | 0.823796s |
| stream | beanstalk |
---|---|---|
主从 | 支持 | 不支持 |
性能 | 相当 | 相当 |
任务持久化 | 支持 | 支持 |
任务优先级 | 不支持 | 支持 |
任务延迟 | 不支持 | 支持 |
超时任务 | 额外处理 | 自动 |
批量任务读写 | 支持 | 不支持 |