首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >ThinkPHP8.X〡Queue 异步任务队列应用

ThinkPHP8.X〡Queue 异步任务队列应用

作者头像
Tinywan
发布于 2025-07-24 01:41:09
发布于 2025-07-24 01:41:09
24300
代码可运行
举报
文章被收录于专栏:开源技术小栈开源技术小栈
运行总次数:0
代码可运行

Think Queue 是 ThinkPHP 官方提供的一个消息队列服务,是专门支持队列服务的扩展包。Think Queue 消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。Think Queue 消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。

支持消息队列的基本特性

  • 消息的发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等
  • 队列的多队列、内存限制、启动、停止、守护等
  • 消息队列可降级位同步执行

安装

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
composer require topthink/think-queue

使用流程

  1. 消息的消费与删除
  2. 消息的创建与推送
  3. 任务处理
  4. 任务发布

消息的消费与删除

创建RestyJob消费者类

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?php
/**
 * @desc TinywanJob
 */
declare(strict_types=1);

namespaceapp\job;

usethink\facade\Log;
usethink\queue\Job;

class TinywanJob
{
    /**
     * @desc fire 是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param $data array|mixed $data 发布任务时自定义的数据
     * @author Tinywan(ShaoBo Wan)
     */
    publicfunction fire(Job $job, $data)
    {
        // 有效消息到达消费者时可能已经不再需要执行了
        if (!$this->checkJob($data)) {
            $job->delete();
            return;
        }
        //执行业务处理
        if ($this->doJob($data)) {
            $job->delete();//任务执行成功后删除
            Log::info("dismiss job has been down and deleted");
        } else {
            //检查任务重试次数
            if ($job->attempts() > 3) {
                Log::info('dismiss job has been retried more that 3 times');
                $job->delete();
            }
        }
    }

    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean 任务执行的结果
     */
    privatefunction checkJob($data)
    {
        $ts = $data["ts"];
        $bizid = $data["bizid"];
        $params = $data["params"];
        returntrue;
    }

    /**
     * 根据消息中的数据进行实际的业务处理
     */
    privatefunction doJob($data): bool
    {
        // 实际业务流程处理
        returntrue;
    }
}

消息创建与推送

在业务控制器中创建一个新消息并推送到指定的队列中

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?php
/**
 * @desc TinywanJob
 */
declare(strict_types=1);

namespaceapp\controller;

useapp\BaseController;

class Queue extends BaseController
{
    /**
     * @desc 立即执行
     * @author Tinywan(ShaoBo Wan)
     */
    publicfunction push()
    {
        // 1. 当前任务将由哪个类来负责处理。当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        $jobHandlerClassName = \app\job\TinywanJob::class;

        // 2. 当前任务归属的队列名称,如果为新队列,会自动创建
        $jobQueueName = 'restyJobQueue';

        // 3. 当前任务所需的业务数据, 不能为 resource 类型,其他类型最终将转化为json形式的字符串。(jobData 为对象时,存储其public属性的键值对 )
        $jobData = [
            'type' => 'test',
            'record_id' => 123,
        ];

        // 4. 将该任务推送到消息队列,等待对应的消费者去执行
        $isPushed = \think\facade\Queue::push($jobHandlerClassName, json_encode($jobData, JSON_UNESCAPED_UNICODE), $jobQueueName);

        // database 驱动时,返回值为 1|false;redis 驱动时,返回值为 随机字符串|false
        if ($isPushed !== false) {
            echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ" . "<br>";
        } else {
            echo'Oops, something went wrong.';
        }
        return'push';
    }
}

这里是采用手动指定消息处理类的方式,更合理的做法是事先定义好消息名称与消费者类名的映射关系,然后根据某个可以获取该映射关系的类来推送消息,这样生产者只需要知道消息的名称,而无需指定具体哪个消费者来处理。

处理任务

切换到当前终端到项目根目录

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
php think queue:work --queue tinywanJobQueue

实际使用过程中应安装Supervisor这样的通用进程管理工具,它会监控php think queue:work的进程,一旦失败会帮助重启

简单来总结下使用流程

  • 安装Supervisor并编写应用程序配置脚本,脚本主要用来运行php think queue:work命令。
  • 运行Supervisor服务,它会读取主进程和应用程序配置。
  • 运行自己编写的消息队列并根据日志查看是否正常运行

任务发布

访问接口http://127.0.0.1:8786/queue/push查看推送是否成功

命令

Work模式 queue:work

用于启动一个工作进程来处理消息队列

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
php think queue:work --queue tinywanJobQueue

参数说明

  • --daemon 是否循环执行,如果不加该参数则该命令处理完下一个消息就退出。
  • --queue dismiss_job_queue 要处理的队列的名称
  • --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。
  • --force 系统处于维护状态时,是否仍然处理任务,并未找到相关说明。
  • --memory 128 该进程允许使用的内存上限,以M为单位。
  • --sleep 3 如果队列中无任务则sleep多少秒后重新检查(work+daemon模式)或退出(listen或非daemon模式)
  • --tries 2 如果任务已经超过尝试次数上限,则触发“任务尝试数超限”事件,默认为0

Listen模式 queue:listen

用于启动一个listen进程,然后由listen进程通过proc_open('php think queue:work --queue="%s" --delay=%s --memory=%s --sleep=%s --tries=%s')来周期性地创建一次性的work进程来消费消息队列,并且限制该work进程的执行事件,同时通过管道来监听work进程的输出。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
php think queue:listen --queue tinywanJobQueue
  • --queue dismiss_job_queue 监听队列的名称
  • --delay 0 如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0。
  • --memory 128 该进程允许使用的内存上限,以M为单位。
  • --sleep 3 如果队列中无任务,则多长时间后重新检查。
  • --tries 0 如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0。
  • --timeout 60 工作进程允许执行的最长时间,以秒为单位。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装
  • 使用流程
    • 消息的消费与删除
    • 消息创建与推送
    • 处理任务
    • 任务发布
  • 命令
    • Work模式 queue:work
    • Listen模式 queue:listen
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档