前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >让Laravel/Lumen队列消费Non-Laravel queue job

让Laravel/Lumen队列消费Non-Laravel queue job

原创
作者头像
andychai
发布2019-03-08 17:51:29
2.6K0
发布2019-03-08 17:51:29
举报
文章被收录于专栏:andychai

如何让Laravel/Lumen作为消费者处理非Laravel/Lumen生产的消息?

一句话概括需求就是:Allow Laravel to process non-laravel queue job.

小伙伴们应该都清楚在Laravel中的队列体系,是把实现了你的Job类进行序列化之后在队列中传输,消费者一方通过反序列化恢复对象,所以在Job类中我们可以完整传递信息,如Eloquent\Model 等,但是如果生产者不是Laravel/Lumen体系的服务,投递到队列的消息也不是Queueable的对象,那Laravel Queue就无法正常解析,并且抛出异常。

例子可看下方代码

代码语言:txt
复制
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob;

class GatewayJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle(RabbitMQJob $job, array $data)
    {
        dump($data);
        dump($job->getRawBody());
        $job->delete();
    }
}

业务背景

AWP物理架构.png
AWP物理架构.png

从上图可以看到,我们的业务通过微信网关(swoole)接受微信开放平台消息,在根据业务路由规则分发到下游服务,其中转发消息分为实时异步实时就不说了,异步就是微信网关将消息投递到消息队列(RabbitMQ),最后由消费者(Laravel)进行消息处理。

解释几个大家可能会问的问题:

为什么架构图中有两个网关?

  • 基于OpenRestryKong网关用于处理用户h5侧的请求分发,基于下游服务大部分是swoole实现的内存常驻性,借助Kong API网关的优势:动态路由,健康检查,限流,可开发插件(e.g.Cl5,秒级监控),更有助于我们把控服务流量入口。
  • 基于swoole实现的微信网关,只专注于接受微信开放平台的密文,解密后分发至各个需要的下游服务。

为什么消费者是Laravel?

Laravel作为我们整套微服务体系的管理后台,既然是管理后台,当然还是单体式开发更舒适。再者管理后台已经聚集了所有数据对象的操作模型,那写消费者逻辑就更高效。至于消费者进程的运行方式是Supervisor+Laravel Queue,本身就是内存常驻型+KeepAlived,不担心传统LNMP架构的效率问题。

微信消息异步通信.png
微信消息异步通信.png

能不能直接写Laravel Command替代?

可以,但不优雅,不喜欢!


其实对于Allow Laravel to process non-laravel queue job这个问题还是比较有普遍性,毕竟生产者和消费者不是用一个框架,甚至不同语种都是很正常的。网上就有人问:“我的生产者是NodeJS,消费者是Laravel。。。不知道该怎么办。”

解决办法

Illuminate\Queue\Jobs\Job类中的fire方法一直往下跟,你就会得到答案:

代码语言:txt
复制
/**
     * Fire the job.
     *
     * @return void
     */
    public function fire()
    {
        $payload = $this->payload();

        list($class, $method) = JobName::parse($payload['job']); // 在这里解析队里中的Job格式

        with($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }

最后追到Illuminate\Support\Traits\Macroable\Str中的parseCallback方法你就得到答案了:

代码语言:txt
复制
/**
     * Parse a Class@method style callback into class and method.
     *
     * @param  string  $callback
     * @param  string|null  $default
     * @return array
     */
    public static function parseCallback($callback, $default = null)
    {
        return static::contains($callback, '@') ? explode('@', $callback, 2) : [$callback, $default];
    }

解释

假设我想在队列中传输数据,指定消费者为App\Jobs\GatewayJob类的handle方法处理,那么能够让Laravel正确解析的数据结构(json)为:

代码语言:txt
复制
{
  "job": "App\\Jobs\\GatewayJob@handle",
  "data": {
    "payload": {
      "ToUserName": "gh_47a4043b185f",
      "FromUserName": "o9Le4wyQ08reKM2bn6evr1--YpOE",
      "CreateTime": "1551346579",
      "MsgType": "text",
      "Content": "kok波洛克",
      "MsgId": "22209944230247159"
    },
    "extra": {
      "wx-appid": "wxcc5d27e1c808d79c",
      "timestamp": 1551346579,
      "area": "1",
      "platid": "0",
      "partition": "10071",
      "charac-name": "andychai",
      "charac-no": "2299733847",
      "unionid": "o9mcu0yK2GqgMJ9ZOKBV9XOzdC9A",
      "has-game-role": 1
    }
  }
}

job

data

声明处理Job的类名和方法

真实传输的数据

在回到我们之前make好的GatewayJob中看:

代码语言:txt
复制
/**
     * @param RabbitMQJob $job Job父类本身(这里是RabbitMQJob的子类实现)
     * @param array $data 真实数据
     */
    public function handle(RabbitMQJob $job, array $data)
    {
        dump($data);
        dump($job->getRawBody());
        $job->delete();
    }

结束语

唯一让人不爽的是,生产者这一侧非要知道消费者的Job类才能够正常传递,感觉上有点怪怪的。但由于我们的微信网关的路由配置,本身也是能够在管理端动态配置,并且实时生效,所以这个问题也就是多加一个字段就解决了。

期待你有更好更优雅的方案!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 如何让Laravel/Lumen作为消费者处理非Laravel/Lumen生产的消息?
    • 业务背景
      • 解释几个大家可能会问的问题:
        • 为什么架构图中有两个网关?
        • 为什么消费者是Laravel?
        • 能不能直接写Laravel Command替代?
      • 解决办法
        • 解释
      • 结束语
        • 期待你有更好更优雅的方案!
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档