Loading [MathJax]/jax/output/CommonHTML/jax.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于 Redis 消息队列实现 Laravel 事件监听及底层源码探究

基于 Redis 消息队列实现 Laravel 事件监听及底层源码探究

作者头像
学院君
发布于 2021-01-08 07:48:26
发布于 2021-01-08 07:48:26
3.7K00
代码可运行
举报
文章被收录于专栏:学院君的专栏学院君的专栏
运行总次数:0
代码可运行

在 Laravel 中,除了使用 dispatch 辅助函数通过 Illuminate\Bus\Dispatcher 显式推送队列任务外,还可以通过事件监听的方式隐式进行队列任务推送,在这个场景下,事件监听器实际上扮演了「任务类」的角色。

还是以文章浏览数更新为例。开始之前,我们先来给大家演示下事件监听和处理的基本实现。

事件监听基本使用

首先创建一个文章浏览事件类 PostViewed

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sail artisan make:event PostViewed

然后编写这个事件类代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?php

namespace App\Events;

use App\Models\Post;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class PostViewed
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public Post $post;

    /**
     * Create a new event instance.
     *
     * @param Post $post
     */
    public function __construct(Post $post)
    {
        $this->post = $post;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return Channel|array
     */
    public function broadcastOn()
    {
        return new PrivateChannel('channel-name');
    }
}

事件类的作用就是装载事件相关的数据,这里我们引入了 Post 模型实例,以便在事件监听器中进行相应的处理,事件类中默认还有一个 broadcastOn 表示事件的广播通道,我们在后面介绍广播时再详细介绍这个方法。

有了事件之后,还要创建一个监听这个事件的处理器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
php artisan make:listener IncreasePostViews

编写处理器代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?php

namespace App\Listeners;

use App\Events\PostViewed;
use Illuminate\Support\Facades\Redis;

class IncreasePostViews
{
    /**
     * Create the event listener.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Handle the event.
     *
     * @param PostViewed $event
     * @return void
     */
    public function handle(PostViewed $event)
    {
        if ($event->post->increment('views')) {
            Redis::zincrby('popular_posts', 1, $event->post->id);
        }
    }
}

我们将之前队列任务类的 handle 方法代码搬到了事件监听器的 handle 方法中,作为文章浏览事件发生时的处理逻辑。

要建立事件与监听器之间的映射关系,保证事件发生时可以通过监听器对其进行处理,需要在 EventServiceProvider 中维护一个监听数组配置:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected $listen = [
    ...
    PostViewed::class => [
        IncreasePostViews::class
    ],
];

以事件做键,事件监听器做值,表示一个事件可以同时被多个事件监听器监听和处理。

Laravel 还提供了事件自动发现功能,不过考虑到反射性能较差,我们这里还是使用传统的手动注册方式。

这样一来,当我们在 PostControllershow 方法中触发 PostViewed 事件时:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 浏览文章
public function show($id)
{
    $post = $this->postRepo->getById($id);
    // 触发文章浏览事件
    event(new PostViewed($post));
    return "Show Post #{$post->id}, Views: {$post->views}";
}

就会触发监听该事件的所有处理器类执行 handle 方法处理这个事件,默认情况下,事件监听器是同步执行的,所以你可以立即看到文章浏览数被更新:

基于队列处理事件监听

这只是一个更新单条数据库记录的事件处理,如果是耗时操作,比如网络请求、邮件发送、大的数据库事务等,同步处理事件监听会导致这个页面浏览要加载很长时间,降低用户体验和系统负载,所以 Laravel 还支持将事件处理推送到消息队列异步处理,提升系统性能,优化用户体验。

要让事件处理自动推送到消息队列,只需要让对应的事件监听器类和队列任务类一样实现 ShouldQueue 接口即可,为了方便与队列系统交互,你还可以使用 InteractsWithQueue Trait(这一步不是必须的):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
...
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;

class IncreasePostViews implements ShouldQueue
{
    use InteractsWithQueue;

    ...
}

如果你在创建之初就已经明确知道这个事件监听器的处理操作会推送到队列,可以在创建事件监听器的时候使用 --queued 选项:php artisan make:listener IncreasePostViews

其他代码不用做任何调整,这样,当事件触发时,对于这个实现了 ShouldQueue 接口的监听器,Laravel 会自动将其作为「任务类」推送到消息队列(默认连接、默认队列名称),如果你想要自定义队列连接、队列名称、延迟和重试之类的配置,可以像在队列任务类中一样在这个监听器类中定义 connectionqueuedelaytries 等属性:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public string $queue = 'events';

这个时候,当你刷新浏览器中的文章浏览页面时,就会发现不再执行文章浏览数更新操作了,说明这个处理操作被推送到队列系统了:

你可以在 Redis 队列 laravel_database_queues:events 中看到对应的消息数据:

这个消息数据对应的 JSON 数据如下:

其中的 data.command 反序列化后的结果如下:

其实就是 IncreasePostViews 监听器类,可以看到这些数据结构和消息队列一模一样,所以可以大胆猜测它们底层共用了同一套代码。

为了让 events 队列中的事件监听器被处理掉,运行如下命令启动消息队列处理进程:

你可以到数据库中验证 posts.id = 88 的记录,如果 views 字段值等于 97,则表明文章浏览事件被成功处理。

底层实现源码

为了一探事件监听和处理的底层实现原理,我们到 Laravel 底层查看相关的源码实现。

注册事件与对应的监听器处理逻辑

在 Laravel 应用启动过程中,会调用 App\Providers\EventServiceProviderregister 方法基于 listen 数组注册事件和监听器的映射关系:

这里的 Event 门面是在 Illuminate\Events\EventServiceProviderregister 方法中注册的 events 服务的代理:

Event::listen 调用的就是 Dispatcher 类的 listen 方法,需要注意的是这里的 Dispatcher 对应着 Illuminate\Events\Dispatcher 类,而不是队列任务分发时调用的 Illuminate\Bus\Dispatcher 类。这两个类不是同一个类,也分别实现了不同接口。

在初始化 Illuminate\Events\Dispatcher 实例时还通过 setQueueResolver 方法基于闭包函数设置了队列服务实例,如果事件处理要推送到队列,则使用这个服务实例进行操作,该闭包函数返回的服务实例正是 QueueManager 对象实例。

Illuminate\Events\Dispatcherlisten 方法中,我们得以窥见事件及对应监听器处理逻辑的注册源码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function listen($events, $listener = null)
{
    if ($events instanceof Closure) {
        return $this->listen($this->firstClosureParameterType($events), $events);
    } elseif ($events instanceof QueuedClosure) {
        return $this->listen($this->firstClosureParameterType($events->closure), $events->resolve());
    } elseif ($listener instanceof QueuedClosure) {
        $listener = $listener->resolve();
    }

    foreach ((array) $events as $event) {
        if (Str::contains($event, '*')) {
            $this->setupWildcardListen($event, $listener);
        } else {
            $this->listeners[$event][] = $this->makeListener($listener);
        }
    }
}

不论是基于闭包的,还是基于通配符的,还是基于 PHP 类的(这些示例都可以在 Laravel 事件文档中看到),在这里通通一览无余,以我们定义的 $listen 数组为例,最终所有事件类和对应监听器处理逻辑映射关系都被维护到 Illuminate\Events\Dispatcherlisteners 数组中,Dispatcher 是以单例模式绑定到服务容器的,所以 listeners 数组在启动期间一经注册完毕,在当前请求生命周期全局有效。

所有事件对应的监听器处理逻辑此时都是闭包函数,只有在对应事件被触发时才会真正执行,我们在执行时再详细剖析 makeListener 方法的底层实现。

事件触发时底层处理逻辑

event 辅助函数对应的实现代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
function event(...$args)
{
    return app('events')->dispatch(...$args);
}

这里的 app('events') 会被解析为上面的 Illuminate\Events\Dispatcher 对象实例,所以当我们通过 event 函数触发事件时,实际上调用的是 Illuminate\Events\Dispatcher 类的 dispatch 方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function dispatch($event, $payload = [], $halt = false)
{
    [$event, $payload] = $this->parseEventAndPayload(
        $event, $payload
    );

    if ($this->shouldBroadcast($payload)) {
        $this->broadcastEvent($payload[0]);
    }

    $responses = [];

    foreach ($this->getListeners($event) as $listener) {
        $response = $listener($event, $payload);

        if ($halt && ! is_null($response)) {
            return $response;
        }

        if ($response === false) {
            break;
        }

        $responses[] = $response;
    }

    return $halt ? null : $responses;
}

在这个方法中,我们首先从参数中解析出事件名和载荷数据。

载荷数据在广播时会用到,我们后面介绍广播时再详细探讨它,这里先忽略。

如果这是个广播事件,则进行广播事件推送处理,然后继续往后执行,从 listeners 数组中通过事件名解析出所有与之映射的监听器处理逻辑,由于映射的监听器处理逻辑此时都是闭包函数,所以需要调用对应的闭包函数才能真正执行这些处理逻辑:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$response = $listener($event, $payload);
不同类型监听器底层处理逻辑

我们接下来来分析 makeListener 方法底层是如何通过闭包函数封装监听器的事件处理逻辑的:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function makeListener($listener, $wildcard = false)
{
    if (is_string($listener)) {
        return $this->createClassListener($listener, $wildcard);
    }

    if (is_array($listener) && isset($listener[0]) && is_string($listener[0])) {
        return $this->createClassListener($listener, $wildcard);
    }

    return function ($event, $payload) use ($listener, $wildcard) {
        if ($wildcard) {
            return $listener($event, $payload);
        }

        return $listener(...array_values($payload));
    };
}

对于字符串类型的监听器类,它会调用 Dispatcher 类的 createClassListener 方法创建监听器类实例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function createClassListener($listener, $wildcard = false)
{
    return function ($event, $payload) use ($listener, $wildcard) {
        if ($wildcard) {
            return call_user_func($this->createClassCallable($listener), $event, $payload);
        }

        $callable = $this->createClassCallable($listener);

        return $callable(...array_values($payload));
    };
}

该方法又会调用 createClassCallable 方法对监听器类做进一步处理:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected function createClassCallable($listener)
{
    [$class, $method] = is_array($listener)
                        ? $listener
                        : $this->parseClassCallable($listener);

    if (! method_exists($class, $method)) {
        $method = '__invoke';
    }

    if ($this->handlerShouldBeQueued($class)) {
        return $this->createQueuedHandlerCallable($class, $method);
    }

    return [$this->container->make($class), $method];
}

在这个方法中,首先会解析监听器处理事件的方法,默认是 handle 方法,如果该方法不存在,则使用 __invoke 方法(所以在 IncreasePostViews 中,还可以定义 __invoke 方法替代 handle),如果监听器类实现了 ShouldQueue 接口,则调用 createQueuedHandlerCallable 方法定义将其推送到队列的闭包函数:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected function createQueuedHandlerCallable($class, $method)
{
    return function () use ($class, $method) {
        $arguments = array_map(function ($a) {
            return is_object($a) ? clone $a : $a;
        }, func_get_args());

        if ($this->handlerWantsToBeQueued($class, $arguments)) {
            $this->queueHandler($class, $method, $arguments);
        }
    };
}

这里还有一个判断逻辑,handlerWantsToBeQueued 会基于监听器类定义的 shouldQueue 方法判断当前事件监听器是否满足推送到队列执行的条件(所以可以在事件监听器类中基于这个方法实现按条件推送到队列),如果不满足也不会推送到队列,如果满足则调用 queueHandler 方法将当前事件监听器作为任务类推送到队列:

这里的队列服务实例正是从 EventServiceProvider 注册 events 服务时通过 setQueueResolver 设置的队列服务中解析出来的,最终对应的是 QueueManager 对象实例,这里可以基于事件监听器定义的 connectionqueuedelay 属性解析队列连接、名称和延迟推送时间,如果监听器类没有定义,则使用默认值,后面的实现源码想必我也不用贴出来了,参考前面消息队列底层源码即可(当前是基于 Redis 驱动的队列系统,对应的队列实现类是 RedisQueue)。

回到 createClassCallable 方法,如果当前监听器类没有实现 ShouldQueue 接口,则直接以数组形式返回当前监听器类对象实例和处理事件的方法,以 IncreasePostViews 为例,是 handle 方法。而在上一层 createClassListener 方法中,不管推送到队列还是直接执行,所有事件监听器处理逻辑最终都会通过闭包函数封装返回给上一层调用代码。

回到最上层 makeListener 方法,如果是通配符事件或者基于闭包含函数定义的监听器则在前面处理的基础上再包裹一层闭包函数返回。

至此,我们就取得了所有类型事件监听器的处理逻辑闭包函数:

  • 对于字符串类型的监听器类,如果实现了 ShouldQueue 接口,则返回推送到队列的闭包函数,否则返回直接执行监听器实例处理方法的闭包函数;
  • 对于通配符事件监听器和基于闭包的事件监听器,则在之前处理基础上在外层再包裹一层闭包函数返回。

这样,当我们在 Illuminate\Events\Dispatcher 类的 dispatch 方法中调用如下这行代码时:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$response = $listener($event, $payload);

event 和

小结

好了,到这里,你应该对 Laravel 事件监听和处理的全貌了然于胸了吧,事件的监听处理和队列推送消费很像,都是把生产者和消费者隔离,从而降低业务代码的耦合,提高系统的水平扩展性,而且事件处理部分也可以推送到队列处理,进而提升系统性能,这个时候,事件监听和处理就演化成了基于事件订阅的消息队列系统了。

本系列教程首发在学院君网站(xueyuanjun.com),你可以点击页面左下角阅读原文链接查看最新更新的教程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 极客书房 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Laravel源码解析之事件系统
Laravel 的事件提供了一个简单的观察者实现,能够订阅和监听应用中发生的各种事件。事件机制是一种很好的应用解耦方式,因为一个事件可以拥有多个互不依赖的监听器。 laravel 中事件系统由两部分构成,一个是事件的名称,事件的名称可以是个字符串,例如 event.email,也可以是一个事件类,例如 App\Events\OrderShipped;另一个是事件的 监听器 listener,可以是一个闭包,还可以是监听类,例如 App\Listeners\SendShipmentNotification。
KevinYan
2019/10/13
1.2K0
基于 Redis 在 Laravel 中实现消息队列及底层源码探究
对应的基本工作流程是生产者(业务代码)先将消息数据推送到队列,然后再通过其他的处理进程来消费队列中的消息数据,从而实现生产者和消费者之间的解耦。因此,消息队列非常适用于一些需要异步执行的耗时操作(比如邮件发送、文件上传),或者业务临时的高并发操作(比如秒杀、消息推送),对于提升系统性能和负载非常有效,尤其是 PHP 这种本身不支持并发编程的语言,是实现异步编程的不二之选。
学院君
2021/01/08
6.7K0
基于 Redis 实现 Laravel 广播功能(上):广播事件分发和底层源码探究
在上篇教程中,学院君给大家演示了如何通过 Redis + Socket.io 实现事件消息广播功能,这是一个非常简单的实现,目的在于帮助大家熟悉实时消息广播的底层流程,今天这篇教程,我们将结合 Laravel 生态提供的广播组件和前端技术栈来搭建一个生产环境可用的、更加系统的实时消息系统。
学院君
2021/01/08
3.7K0
【Laravel系列7.5】事件系统
说到事件,你会想到什么?JS 中的回调函数,按扭的回调事件?没错,这些都是事件的应用。不过在 Laravel 中,事件是一种解耦机制,是 观察者 模式的一种体现。它能够允许你订阅和监听在你的应用中发生的各种事件。最典型的例子,当你操作完订单后,需要发送短信、邮件或者应用内通知的时候,我们一般就会使用观察者模式来实现。而事件,则是对这一操作的封装,非常方便好用。
硬核项目经理
2023/03/03
1.9K0
【Laravel系列7.5】事件系统
基于 Redis 消息队列实现邮件通知的异步发送
由于发送邮件、短信之类的操作通常涉及到第三方服务的调用,所以也是个响应时间不确定的耗时操作,如果放到处理用户请求进程中同步处理,需要等待很长时间才能获取响应结果,为了提升用户体验,可以让这些操作通过消息队列异步处理。
学院君
2021/01/22
3.3K0
Laravel5.5之事件监听、任务调度、队列
你写好任务类后,就能通过 dispatch 辅助函数来分发它了。唯一需要传递给 dispatch 的参数是这个任务类的实例: 利用模型工厂生成30个用户:
唐成勇
2019/05/26
1.5K0
PHP-web框架Laravel-队列(二)
在 Laravel 中,定义作业是通过实现 Illuminate\Contracts\Queue\Job 接口来完成的。这个接口定义了一个 fire 方法,用于处理作业的逻辑。下面是一个示例::
堕落飞鸟
2023/04/30
6850
Laravel 事件处理(event)+ 队列使用(queue)
该脚本定义了一下优化参数,执行redis中名字为FMock的队列,不输出任何信息,3秒一次,失败后3秒重新执行,最多执行3次。
躺平程序员老修
2023/09/05
6880
PHP-web框架Laravel-事件(二)
在Laravel框架中,事件监听器通常用于执行特定的操作。例如,在UserRegistered事件发生时,SendUserConfirmationEmail监听器可能会向用户发送确认电子邮件。
堕落飞鸟
2023/04/30
6610
浅析 Laravel 底层原理:契约(Contracts)
Laravel 中的契约是指框架提供的一系列定义核心服务的接口(interface)。
码农编程进阶笔记
2021/07/20
1.3K0
laravel-redis消息队列
基本的流程就是由生产者(业务代码)将数据推送到队列中(此处使用的是Redis),然后由消费者(处理程序)从队列中取出数据进行加工处理。
用户10002156
2024/01/29
3320
laravel-redis消息队列
Laravel7中Redis队列的使用
首先我们需要在配置文件中配置默认队列驱动为Redis,队列配置文件是config/queue.php:
Lansonli
2021/10/09
1.2K0
基于 Redis 消息队列实现文件上传的异步存储
本来准备给 Redis 实战入门篇做个收尾了,不过想起来 Laravel 进阶组件部分还剩下文件存储、邮件和通知这几个功能没有介绍,不如索性一并介绍下,因为它们并不是和 Redis 风马牛不相及,我们可以将这些耗时操作通过消息队列异步处理来提升页面响应速度,优化用户体验。
学院君
2021/01/22
3.9K0
Laravel 消息队列的优先级和失败任务重试实现
上篇教程发布后,有同学反馈消息队列的优先级怎么实现,Laravel 本身对此提供了支持,除此之外,Laravel 的队列组件还支持批处理、延迟推送、失败任务处理、消息队列中间件、频率限制等很多特性,一篇教程根本介绍不完,毕竟消息队列也是个很复杂的系统,但是放到这里来讲似乎又偏离了 Redis 这个主题,所以这里学院君先给大家简单介绍下消息队列优先级和失败任务处理的实现,至于更多功能特性,后面单独开一个消息队列专题进行系统介绍。
学院君
2021/01/08
2.6K0
Laravel底层学习笔记04 加载并启动ServiceProvider,事件(观察者模式)
加载并启动ServiceProvider 源码 public/index.php $kernel = $app->make(Illuminate\Contracts\Http\Kernel::class); //1. Illuminate\Contracts\Http\Kernel::class 是别名 //2. $kernel是App\Http\Kernel的实例化对象 //3. App\Http\Kernel::class继承src/Illuminate/Foundation/Http/Kernel v
用户7353560
2021/11/07
6710
PHP-web框架Laravel-事件(一)
在Laravel框架中,事件是一种用于处理应用程序中各种操作的工具。事件可以用于在某个操作执行前或执行后执行一些特定的代码。使用Laravel框架,我们可以轻松地定义和使用事件。
堕落飞鸟
2023/04/30
6480
Laravel 邮箱认证
我们修改的User实现了 MustVerifyEmailContract 接口 查看其源码 vendor/laravel/framework/src/illuminate/Contracts/Auth/MustVerifyEmail
切图仔
2022/09/14
9090
Laravel 邮箱认证
基于 Redis 实现 Laravel 广播功能(下):在私有频道和存在频道发布和接收消息
在上面的示例广播事件 UserSignedUp 中,我们通过 Channel 定义了一个公共频道广播,即所有客户端都可以接收到这个事件消息:
学院君
2021/01/08
3.5K0
Laravel/Lumen 使用 redis队列
在Web开发中,我们经常会遇到需要批量处理任务的场景,比如群发邮件、秒杀资格获取等,我们将这些耗时或者高并发的操作放到队列中异步执行可以有效缓解系统压力、提高系统响应速度和负载能力。
Lansonli
2021/10/09
2.7K0
Redis 分布式锁在 Laravel 任务调度底层实现中的应用
在 Laravel 项目中,我们可以基于任务调度功能非常轻松地管理 Crontab 定时任务,只需在 App\Console\Kernel 的 schedule 方法中定义所有需要调度的任务,类型包括 Artisan 命令、回调函数或者 Shell 脚本等:
学院君
2021/01/12
6.5K0
Redis 分布式锁在 Laravel 任务调度底层实现中的应用
相关推荐
Laravel源码解析之事件系统
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档