Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >think-queue 解析上

think-queue 解析上

原创
作者头像
李昊天
修改于 2019-08-09 01:55:37
修改于 2019-08-09 01:55:37
1.2K0
举报

前言

分析之前请大家务必了解消息队列的实现

如果不了解请先阅读下:

有赞消息队列设计

去哪儿网消息队列设计

tp5的消息队列是基于database redis 和tp官方自己实现的 Topthink

本章是围绕redis来做分析

存储key:

key

类型

描述

queues:queueName

list

要执行的任务

think:queue:restart

string

重启队列时间戳

queues:queueName:delayed

zSet

延迟任务

queues:queueName:reserved

zSet

执行失败,等待重新执行

执行命令

work和listen的区别在下面会解释

命令

描述

php think queue:work

监听队列

php think queue:listen

监听队列

php think queue:restart

重启队列

php think queue:subscribe

暂无,可能是保留的 官方有什么其他想法但是还没实现

行为标签

标签

描述

worker_daemon_start

守护进程开启

worker_memory_exceeded

内存超出

worker_queue_restart

重启守护进程

worker_before_process

任务开始执行之前

worker_before_sleep

任务延迟执行

queue_failed

任务执行失败

命令参数

参数

默认值

可以使用的模式

描述

queue

null

work,listen

要执行的任务名称

daemon

null

work

以守护进程执行任务

delay

0

work,listen

失败后重新执行的时间

force

null

work

失败后重新执行的时间

memory

128M

work,listen

限制最大内存

sleep

3

work,listen

没有任务的时候等待的时间

tries

0

work,listen

任务失败后最大尝试次数

模式区别

1: 执行原理不同

work: 单进程的处理模式;

无 daemon 参数 work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出;

有 daemon 参数 work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间;

listen: 父进程 + 子进程 的处理模式;

会在所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后;

所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程;

2: 退出时机不同

work: 看上面

listen: 所在的父进程正常情况会一直运行,除非遇到下面两种情况

01: 创建的某个work子进程的执行时间超过了 listen命令行中的--timeout 参数配置;此时work子进程会被强制结束,listen所在的父进程也会抛出一个 ProcessTimeoutException 异常并退出;

开发者可以选择捕获该异常,让父进程继续执行;

02: 所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 --memory 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。

3: 性能不同

work: 是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;

listen: 是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本;

因此 work 模式的性能会比listen模式高。

注意: 当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。

4: 超时控制能力

work: 本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间;

listen: 可以限制其创建的work子进程的超时时间;

可通过 timeout 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;

expire 和time的区别

expire 在配置文件中设置,指任务的过期时间 这个时间是全局的,影响到所有的work进程

timeout 在命令行参数中设置,指work子进程的超时时间,这个时间只对当前执行的listen 命令有效,timeout 针对的对象是 work 子进程;

5: 使用场景不同

work 适用场景是:

01: 任务数量较多

02: 性能要求较高

03: 任务的执行时间较短

04: 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑

listen 适用场景是:

01: 任务数量较少

02: 任务的执行时间较长

03: 任务的执行时间需要有严格限制

公有操作

由于我们是根据redis来做分析 所以只需要分析src/queue/connector/redis.php 01: 首先调用 src/Queue.php中的魔术方法 __callStatic

02: 在__callStatic方法中调用了 buildConnector

03: buildConnector 中首先加载配置文件 如果无将是同步执行

04: 根据配置文件去创建连接并且传入配置

在redis.php类的构造方法中的操作:

01: 检测redis扩展是否安装

02: 合并配置

03: 检测是redis扩展还是 pRedis

04: 创建连接对象

发布过程

发布参数

参数名

默认值

描述

可以使用的方法

$job

要执行任务的类

push,later

$data

任务数据

push,later

$queue

default

任务名称

push,later

$delay

null

延迟时间

later

立即执行

代码语言:txt
AI代码解释
复制
    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一顿骚操作后返回一个数组 并且序列化后 rPush到redis中 key为 queue:queueName

数组结构:

代码语言:txt
AI代码解释
复制
[
    'job' => $job, // 要执行任务的类
    'data' => $data, // 任务数据
    'id'=>'xxxxx' //任务id
]

写入 redis并且返回队列id

至于中间的那顿骚操作太长了就没写

延迟发布

代码语言:txt
AI代码解释
复制
    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多

一顿骚操作后返回一个数组 并且序列化后 zAdd 到redis中 key为 queue:queueName:delayed score为当前的时间戳+$delay

执行过程

执行过程有work模式和listen模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;

最后讲一下标签的使用

代码语言:txt
AI代码解释
复制
    //守护进程开启
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //内存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重启守护进程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任务开始执行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任务延迟执行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任务执行失败
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

undefined

代码语言:txt
AI代码解释
复制
public function run(Output $output)
    {
        $output->write('<info>任务执行失败</info>', true);
    }

控制台执行 php think queue:work --queue test --daemon

会在控制台一次输出

代码语言:txt
AI代码解释
复制
守护进程开启
任务延迟执行

失败的处理 如果有任务执行失败或者执行次数达到最大值

会触发 queue_failed

app\index\behavior@run方法里面写失败的逻辑 比如邮件通知 写入日志等

最后我们来说一下如何在其他框架或者项目中给tp的项目推送消息队列,例如两个项目是分开的 另一个使用的却不是tp5的框架

在其他项目中推任务

php版本

代码语言:txt
AI代码解释
复制
<?php

class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i < $length; $i++) {
            $randomString .= $str[rand(0, strlen($str) - 1)];
        }
        return $randomString;
    }
}

(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

代码语言:txt
AI代码解释
复制
package main

import (
	"encoding/json"
	"github.com/garyburd/redigo/redis"
	"math/rand"
	"time"
)

type Payload struct {
	Id       string      `json:"id"`
	Job      string      `json:"job"`
	Data     interface{} `json:"data"`
	Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
	RedisClient = &redis.Pool{
		MaxIdle:     20,
		MaxActive:   500,
		IdleTimeout: time.Second * 100,
		Dial: func() (conn redis.Conn, e error) {
			c, err := redis.Dial("tcp", "127.0.0.1:6379")

			if err != nil {
				return nil, err
			}

			_, _ = c.Do("SELECT", 10)

			return c, nil
		},
	}

}

func main() {

	var data = make(map[string]interface{})
	data["id"] = "1"

	later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
	payload := createPayload(job, data)
	queueName := "queues:" + queue

	_, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

	m, _ := time.ParseDuration("+1s")
	currentTime := time.Now()
	op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
	createPayload(job, data)
	payload := createPayload(job, data)
	queueName := "queues:" + queue + ":delayed"

	_, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 创建指定格式的数据
func createPayload(job string, data interface{}) (payload string) {
	payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

	jsonStr, _ := json.Marshal(payload1)

	return string(jsonStr)
}

// 创建随机字符串
func random(n int) string {

	var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

	b := make([]rune, n)
	for i := range b {
		b[i] = str[rand.Intn(len(str))]
	}
	return string(b)
}

undefined

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spring Boot + Security-01基于内存认证
代码地址:https://github.com/zhangpu1211/Spring-Security-Demos
听城
2020/02/24
7710
【Spring Boot】011-SpringSecurity(安全)
Spring Security 是 Spring 家族中的一个安全管理框架,实际上,在 Spring Boot 出现之前,Spring Security 就已经发展了多年了,但是使用的并不多,安全管理这个领域,一直是 Shiro 的天下。
訾博ZiBo
2025/01/06
850
【Spring Boot】011-SpringSecurity(安全)
Spring Security 认证的三种方式及简单的授权
在pom.xml文件映入SpringSecutrity依赖启动器,启动项目,访问文章列表页面时,出现默认的登录页,需要用默认用户名:user,密码源于控制台输出,也就是最基础的登录
慕容千语
2020/12/18
3.9K0
Spring学习笔记(三十四)——Springboot集成Spring Security
spring security 是基于 spring 的安全框架。它提供全面的安全性解决方案,同时在 Web 请求级和方法调用级处理身份确认和授权。在 Spring Framework 基础上,spring security 充分利用了依赖注入(DI)和面向切面编程(AOP)功能,为应用系统提供声明式的安全访问控制功能,减少了为企业系统安全控制编写大量重复代码的工作。是一个轻量级的安全框架。它与 Spring MVC 有很好地集成.
不愿意做鱼的小鲸鱼
2022/09/26
1.6K0
Spring学习笔记(三十四)——Springboot集成Spring Security
Spring Security 简单了解使用
Spring Security 是一个基于 Spring 框架的安全框架,提供了一套安全性认证和授权的解决方案,用于保护 Web 应用程序和服务。
Jimmy_is_jimmy
2023/08/02
2920
Spring Security 简单了解使用
SpringSecurity的基础使用
Spring Security is a powerful and highly customizable authentication and access-control framework. It is the de-facto standard for securing Spring-based applications.
暴躁的程序猿
2022/03/24
5190
SpringSecurity的基础使用
Spring Security权限框架理论与简单Case
Spring Security 提供了基于javaEE的企业应用软件全面的安全服务。这里特别强调支持使用Spring框架构件的项目,Spring框架是企业软件开发javaEE方案的领导者。如果你还没有使用Spring来开发企业应用程序,我们热忱的鼓励你仔细的看一看。熟悉Spring特别是一来注入原理两帮助你更快更方便的使用Spring Security。
端碗吹水
2020/09/23
7900
Spring Security权限框架理论与简单Case
Spring Framework 学习笔记(4) Spring Security
Spring Secrity 能够在Web请求级别和方法调用级别处理身份认证和授权。
张云飞Vir
2021/07/13
3320
Spring Security入门案例
IoC/DI和AOP功能,为系统提供了声明式安全访问控制功能,减少了为系统安全而编写大量重复代码的工作。主要包含如下几个重要的内容:
ruochen
2021/11/25
1.3K0
Spring Security权限控制
Spring Security官网 : https://projects.spring.io/spring-security/
二十三年蝉
2022/03/10
1.6K0
Spring Security权限控制
Spring boot项目集成security
在进行框架选型时最常用的选择就是在Spring security 和Shiro中进行抉择,Spring security 和 shiro 一样,都具有认证、授权、加密等用于权限管理的功能。但是对于Springboot而言,Spring Security比Shiro更合适一些,他们都是Spring生态里的内容,并且在使用上Spring boot只需要引入Security就可以实现基础的登陆验证。
余生大大
2022/11/02
3820
Spring boot项目集成security
springboot系列学习(二十四):springboot项目里面整合spring Security框架。一步一步带你整合使用,小白必看(一)
我们的一个普通项目,没有安全的限制也是可以使用的,但是在公司里面,安全就是必须的,不是说非要使用安全框架springsecurity框架。之前我们学过的过滤器,拦截器也是可以实现一定的项目的安全。
一写代码就开心
2020/11/20
8080
springboot系列学习(二十四):springboot项目里面整合spring  Security框架。一步一步带你整合使用,小白必看(一)
Spring 全家桶之 Spring Security(一)
&emsp;&emsp;Spring Security是基于Spring的安全框架,Spring Security提供全面的安全性解决方案,同时在Web Request和Method处理身份认证和授权,在Spring Framework基础上,Spring Security充分利用了Soring的 DI和AOP特性,为应用系统提供了声明式的安全访问控制功能,是一个轻量级的框架,可以很好的与Spring及Spring MVC集成
RiemannHypothesis
2022/08/19
5650
Spring 全家桶之 Spring Security(一)
05 Spring Boot 整合Spring Security
整合Spring Security 整合方法 创建项目时选择security依赖或在pom中添加security依赖 建立SpringSecurityConfig类,继承WebSecurityConfigurerAdapter方法 在刚刚创建的类上添加@EnableWebSecurity注解 设置授权规则 @Override protected void configure(HttpSecurity http) throws Exception { http.authorize
shimeath
2020/07/31
3870
Spring Security 表单登录
本文将重点介绍使用 SpringSecurity登录。 本文将构建在之前简单的Spring MVC示例之上,因为这是设置Web应用程序和登录机制的必不可少的。
乱敲代码
2019/06/26
1.7K0
【Spring Security】003-Spring Security web权限方案(1):用户认证
在resources目录下创建static目录,并创建login.html,name必须是username和password;
訾博ZiBo
2025/01/06
1020
【Spring Security】003-Spring Security web权限方案(1):用户认证
SpringSecurity的使用
我们使用SpringSecurity、shiro两个框架是为了更加简洁的实现安全。
用户11097514
2024/05/30
1560
SpringSecurity的使用
Spring Security 快速了解
我曾经使用 Interceptor 实现了一个简单网站Demo的登录拦截和Session处理工作,虽然能够实现相应的功能,但是无疑Spring Security提供的配置方法更加简单明确,能够更好的保护Web应用。
Rekent
2018/09/04
5580
Spring Security 快速了解
SpringBoot与安全(Spring Security)
​ SpringSecurity 是针对 Spring 项目的安全框架,也是 Spring Boot 底层安全模块的技术选项。他可以实现强大的 web 安全控制。对于安全控制,我们需要引入 spring-boot-starter-securiy 模块。
OY
2022/03/12
7520
SpringBoot与安全(Spring Security)
学习学习SpringSecurity
SpringSecurity是Spring下的一个安全框架,与shiro 类似,一般用于用户认证(Authentication)和用户授权(Authorization)两个部分,常与与SpringBoot相整合。
mySoul
2020/06/07
6100
相关推荐
Spring Boot + Security-01基于内存认证
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档