Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >think-queue 解析上

think-queue 解析上

原创
作者头像
李昊天
修改于 2019-08-09 01:55:37
修改于 2019-08-09 01:55:37
1.2K0
举报

前言

分析之前请大家务必了解消息队列的实现

如果不了解请先阅读下:

有赞消息队列设计

去哪儿网消息队列设计

tp5的消息队列是基于database redis 和tp官方自己实现的 Topthink

本章是围绕redis来做分析

存储key:

key

类型

描述

queues:queueName

list

要执行的任务

think:queue:restart

string

重启队列时间戳

queues:queueName:delayed

zSet

延迟任务

queues:queueName:reserved

zSet

执行失败,等待重新执行

执行命令

work和listen的区别在下面会解释

命令

描述

php think queue:work

监听队列

php think queue:listen

监听队列

php think queue:restart

重启队列

php think queue:subscribe

暂无,可能是保留的 官方有什么其他想法但是还没实现

行为标签

标签

描述

worker_daemon_start

守护进程开启

worker_memory_exceeded

内存超出

worker_queue_restart

重启守护进程

worker_before_process

任务开始执行之前

worker_before_sleep

任务延迟执行

queue_failed

任务执行失败

命令参数

参数

默认值

可以使用的模式

描述

queue

null

work,listen

要执行的任务名称

daemon

null

work

以守护进程执行任务

delay

0

work,listen

失败后重新执行的时间

force

null

work

失败后重新执行的时间

memory

128M

work,listen

限制最大内存

sleep

3

work,listen

没有任务的时候等待的时间

tries

0

work,listen

任务失败后最大尝试次数

模式区别

1: 执行原理不同

work: 单进程的处理模式;

无 daemon 参数 work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出;

有 daemon 参数 work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间;

listen: 父进程 + 子进程 的处理模式;

会在所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后;

所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程;

2: 退出时机不同

work: 看上面

listen: 所在的父进程正常情况会一直运行,除非遇到下面两种情况

01: 创建的某个work子进程的执行时间超过了 listen命令行中的--timeout 参数配置;此时work子进程会被强制结束,listen所在的父进程也会抛出一个 ProcessTimeoutException 异常并退出;

开发者可以选择捕获该异常,让父进程继续执行;

02: 所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 --memory 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。

3: 性能不同

work: 是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;

listen: 是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本;

因此 work 模式的性能会比listen模式高。

注意: 当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。

4: 超时控制能力

work: 本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间;

listen: 可以限制其创建的work子进程的超时时间;

可通过 timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;

expire 和time的区别

expire 在配置文件中设置,指任务的过期时间 这个时间是全局的,影响到所有的work进程

timeout 在命令行参数中设置,指work子进程的超时时间,这个时间只对当前执行的listen 命令有效,timeout 针对的对象是 work 子进程;

5: 使用场景不同

work 适用场景是:

01: 任务数量较多

02: 性能要求较高

03: 任务的执行时间较短

04: 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑

listen 适用场景是:

01: 任务数量较少

02: 任务的执行时间较长

03: 任务的执行时间需要有严格限制

公有操作

由于我们是根据redis来做分析 所以只需要分析src/queue/connector/redis.php 01: 首先调用 src/Queue.php中的魔术方法 __callStatic

02: 在__callStatic方法中调用了 buildConnector

03: buildConnector 中首先加载配置文件 如果无将是同步执行

04: 根据配置文件去创建连接并且传入配置

在redis.php类的构造方法中的操作:

01: 检测redis扩展是否安装

02: 合并配置

03: 检测是redis扩展还是 pRedis

04: 创建连接对象

发布过程

发布参数

参数名

默认值

描述

可以使用的方法

$job

要执行任务的类

push,later

$data

任务数据

push,later

$queue

default

任务名称

push,later

$delay

null

延迟时间

later

立即执行

代码语言:txt
AI代码解释
复制
    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName

数组结构:

代码语言:txt
AI代码解释
复制
[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id

至于中间的那顿骚操作太长了就没写

延迟发布

代码语言:txt
AI代码解释
复制
    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多

一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;

最后讲一下标签的使用

代码语言:txt
AI代码解释
复制
    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

undefined

代码语言:txt
AI代码解释
复制
public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon

会在控制台一次输出

代码语言:txt
AI代码解释
复制
守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值

会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

代码语言:txt
AI代码解释
复制
<?php

class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i < $length; $i++) {
            $randomString .= $str[rand(0, strlen($str) - 1)];
        }
        return $randomString;
    }
}

(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

代码语言:txt
AI代码解释
复制
package main

import (
	"encoding/json"
	"github.com/garyburd/redigo/redis"
	"math/rand"
	"time"
)

type Payload struct {
	Id       string      `json:"id"`
	Job      string      `json:"job"`
	Data     interface{} `json:"data"`
	Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
	RedisClient = &redis.Pool{
		MaxIdle:     20,
		MaxActive:   500,
		IdleTimeout: time.Second * 100,
		Dial: func() (conn redis.Conn, e error) {
			c, err := redis.Dial("tcp", "127.0.0.1:6379")

			if err != nil {
				return nil, err
			}

			_, _ = c.Do("SELECT", 10)

			return c, nil
		},
	}

}

func main() {

	var data = make(map[string]interface{})
	data["id"] = "1"

	later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
	payload := createPayload(job, data)
	queueName := "queues:" + queue

	_, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

	m, _ := time.ParseDuration("+1s")
	currentTime := time.Now()
	op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
	createPayload(job, data)
	payload := createPayload(job, data)
	queueName := "queues:" + queue + ":delayed"

	_, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
	payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

	jsonStr, _ := json.Marshal(payload1)

	return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

	var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

	b := make([]rune, n)
	for i := range b {
		b[i] = str[rand.Intn(len(str))]
	}
	return string(b)
}

undefined

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
TP5系列 | Queue消息队列
1、Queue内置了 Redis,Database,Topthink ,Sync这四种驱动,本文使用Redis驱动
Tinywan
2019/08/06
4.4K0
TP5系列 | Queue消息队列
Thinkphp-queue自带的队列包使用分析
当前笔记中的内容针对的是 thinkphp-queue 的 v1.1.2 版本,现在官方已经更新到了 v1.1.3 版本, 下文中提到的几个Bug在最新的master分支上均已修复。笔记中的部分内容还未更新。
程序猿的栖息地
2022/04/29
2.2K0
Thinkphp-queue自带的队列包使用分析
PHP实现think-queue介绍
think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。
OwenZhang
2021/12/08
2.1K0
PHP实现think-queue介绍
解决注册并发问题并提高QPS
前言:前面在本地的windows通过apache的ab工具测试了600并发下“查询指定手机是否存在再提交数据”的注册功能会出现重复提交的情况,并且在注册完成时还需要对邀请人进行奖励,记录邀请记录,对该新用户自动发布动态信息,发短信或发邮件等其他业务功能。所以这里当并发时,注册功能就变得低效且容易出现问题。
北桥苏
2024/05/15
1240
解决注册并发问题并提高QPS
【黄啊码】TP6消息推送、队列的使用(thinkphp-queue,可实现小程序消息订阅延迟推送)
topthink/think-queue - PackagistThe ThinkPHP6 Queue Package
黄啊码
2022/01/10
2.2K0
【黄啊码】TP6消息推送、队列的使用(thinkphp-queue,可实现小程序消息订阅延迟推送)
thinkPhp使用框架自带队列think-queue
首先讲解一下何为异步消息队列: 所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。 异步队列的作用: 个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列 转载:https://zhuanlan.zhihu.com/p/129383173
超级小可爱
2023/02/22
1.7K0
PHP高级编程之消息队列
PHP高级编程之消息队列 摘要 2015-10-19 第一版 2016-11-31 第二版 目录 1. 什么是消息队列 2. 为什么使用消息队列 3. 什么场合使用消息队列 4. 什么时候使用消息队列 5. 谁负责处理消息队列 6. 怎么实现消息队列框架 6.1. 守护进程 6.2. 消息队列协议 6.3. 消息队列处理 6.4. 测试 7. 多线程 8. 多请阅横向扩展 9. 总结 10. 延伸阅读 1. 什么是消息队列 消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的
netkiller old
2018/03/05
1.4K0
PHP高级编程之守护进程
PHP高级编程之守护进程 http://netkiller.github.io/journal/php.daemon.html 摘要 2014-09-01 发表 2015-08-31 更新 我的系列文档 Netkiller Architect 手札 Netkiller Developer 手札 Netkiller PHP 手札 Netkiller Python 手札 Netkiller Testing 手札 Netkiller Cryptography 手札 Netkiller Li
netkiller old
2018/03/05
1.1K0
php的消息队列框架resque的使用小结
前段时间在开发一个量化交易系统,这是一个类似股票交易软件的系统:股票价格变化后要实时在终端(APP)的行情价格页面实时更新,如果用户是在持仓页面,还要计算持仓盈亏及预付款比例,因为有一个强制平仓机制(当预付款比较低于30%时系统要自动触发强制平仓),因为行情波动非常快(有些产品一秒钟内价格会变化五六次),所以当某个产品的价格变化时,要触发一系统的行情推送(通过长连接)及盈亏计算操作。
风柏杨4711
2021/03/15
1.1K0
PHP高级编程之守护进程
PHP高级编程之守护进程 摘要 2014-09-01 发表 2015-08-31 更新 2015-10-20 更新,增加优雅重启 ---- 目录 1. 什么是守护进程 2. 为什么开发守护进程 3. 何时采用守护进程开发应用程序 4. 守护进程的安全问题 5. 怎样开发守护进程 5.1. 程序启动 5.2. 程序停止 5.3. 单例模式 5.4. 实现优雅重启 6. 进程意外退出解决方案 1. 什么是守护进程 守护进程是脱离于终端并且在后台运行的进程。守护进程脱离于终端是为了避免进程在执行过程中的信息在任何
netkiller old
2018/03/05
1.2K0
PHP使用topthink/think-queue消息队列实例
sudo nohup php7.2 think queue:work --daemon --queue createAdminLogQueue --tries 2 > out.file 2>&1 &
OwenZhang
2021/12/08
1.1K0
浅谈Laravel队列实现原理解决问题记录
公司项目使用Laravel的开发的两个项目在同一个测试服务器部署,公用同一个redis。在使用laravel中的队列时,产生冲突干扰。
用户8826052
2021/07/13
9510
Laravle Queue命令
php artisan queue:work --help Usage: queue:work [options] [--] [<connection>] Arguments: connection 队列连接redis、database等 Options: --queue[=QUEUE] 队列任务 --daemon 后台执行 --delay[=DELAY] 任务执行失败之后延迟多久重试 --f
苦咖啡
2018/04/28
7430
yii2-queue队列的使用
https://github.com/yiisoft/yii2-queue/blob/master/docs/guide/driver-redis.md
崔哥
2022/11/06
9450
Redis 实现高效任务队列:异步队列与延迟队列详解
文章链接:https://cloud.tencent.com/developer/article/2464880
南山竹
2024/11/12
2530
Redis 实现高效任务队列:异步队列与延迟队列详解
Laravel/Lumen 使用 redis队列
在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件、秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行可以有效缓解系统压力、提高系统响应速度和负载能力。
Lansonli
2021/10/09
2.5K0
DevOps工具介绍连载(7)——Resque
作者:Amazing大龙大龙 链接:https://www.jianshu.com/p/a39904a0ba01 来源:简书
顾翔
2020/02/19
8900
TP6.0 消息队列 topthink/think-queue
topthink/think-queue 是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包
很酷的站长
2023/01/16
1.8K0
think-queue 的安装配置及使用
think-queue 是 ThinkPHP 下的一款任务队列支持组件,这次使用主要用于在项目里承担消息发送及相关操作事件的回调操作。
jwj
2022/06/02
1.9K0
think-queue 的安装配置及使用
基于 Redis 在 Laravel 中实现消息队列及底层源码探究
对应的基本工作流程是生产者(业务代码)先将消息数据推送到队列,然后再通过其他的处理进程来消费队列中的消息数据,从而实现生产者和消费者之间的解耦。因此,消息队列非常适用于一些需要异步执行的耗时操作(比如邮件发送、文件上传),或者业务临时的高并发操作(比如秒杀、消息推送),对于提升系统性能和负载非常有效,尤其是 PHP 这种本身不支持并发编程的语言,是实现异步编程的不二之选。
学院君
2021/01/08
6.5K0
相关推荐
TP5系列 | Queue消息队列
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档