Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于 Redis 实现 Laravel 广播功能(上):广播事件分发和底层源码探究

基于 Redis 实现 Laravel 广播功能(上):广播事件分发和底层源码探究

作者头像
学院君
发布于 2021-01-08 07:49:38
发布于 2021-01-08 07:49:38
3.7K00
代码可运行
举报
文章被收录于专栏:学院君的专栏学院君的专栏
运行总次数:0
代码可运行

上篇教程中,学院君给大家演示了如何通过 Redis + Socket.io 实现事件消息广播功能,这是一个非常简单的实现,目的在于帮助大家熟悉实时消息广播的底层流程,今天这篇教程,我们将结合 Laravel 生态提供的广播组件和前端技术栈来搭建一个生产环境可用的、更加系统的实时消息系统。

这里使用的技术栈是基于 Redis 驱动的 Laravel 广播组件 + 封装了 Socket.io 服务端的 Laravel Echo Server + 封装了 Socket.io 客户端的 Laravel Echo,底层的基本流程其实还是和上篇教程所演示的一样,只是在其基础上封装了更复杂的业务功能,下面我们先来搭建这个广播系统并分析其底层实现源码,再演示上层支持的各种业务功能。

Laravel 后端配置

要使用 Laravel 提供的广播组件,需要在 config/app.php 中取消 BroadcastServiceProvider 前面的注释:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
'providers' => [
    ...

    App\Providers\BroadcastServiceProvider::class,

    ...
],

以便可以在应用启动时加载广播相关路由

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function boot()
{
    Broadcast::routes();

    require base_path('routes/channels.php');
}

channels.php 中的路由和 web.php 中的路由不同,前者是基于 Websocket 协议进行通信的,后者是基于 HTTP 协议进行通信的。

和缓存、队列一样,广播也支持多种驱动,比如 Pusher、Redis,我们可以在 .env 通过设置 BROADCAST_DRIVER 来配置广播驱动,这里将其配置为 Redis:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
BROADCAST_DRIVER=redis

至此,服务端配置工作就完成了。

定义广播事件类

Laravel 支持通过分发广播事件的方式来发布消息(上篇教程我们通过数组模拟了事件消息),要创建广播事件,使用如下 Artisan 命令即可:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
php artisan make:event UserSignedUp

如果要让 Laravel 分发事件时以广播形式推送,需要让其实现 ShouldBroadcast 接口,我们编写 UserSignedUp 这个广播事件类实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?php

namespace App\Events;

use App\Models\User;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class UserSignedUp implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public User $user;
    public string $broadcastQueue = 'broadcast';

    /**
     * Create a new event instance.
     *
     * @param User $user
     */
    public function __construct(User $user)
    {
        $this->user = $user;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return Channel|array
     */
    public function broadcastOn()
    {
        return new Channel('test-channel');
    }
}

我们将上篇教程中以数组形式模拟的事件消息数据转化为了广播事件类,事件负荷数据通过属性形式设置,并且在 broadcastOn 方法中定义了事件消息将被推送到的频道,以及通过 broadcastQueue 属性指定了事件消息如果被推送到队列的话对应的队列名称。

广播事件类和普通的事件类基本结构是一样的,只是在其基础上实现了 ShouldBroadcast 接口表示这是个广播事件,然后通过 broadcastOn 方法定义了广播频道,你可以基于 InteractsWithSockets 提供的方法进行一些 Websocket 设置,还可以定义一些其他的方法和属性用于设置该事件的广播和推送到消息队列的行为,这些方法和属性稍后会在事件分发底层实现中看到。

广播事件分发及底层实现

和普通事件类一样,广播事件也要通过分发进行处理。我们可以在应用的任何地方分发广播事件,为了简化演示,我们将上篇教程编写的 RedisPublish 命令执行代码改为分发广播事件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function handle()
{
    $user = \App\Models\User::find(1);
    event(new UserSignedUp($user));
}

和普通事件类不同的是,广播事件无需注册对应的事件监听器定义处理逻辑,如果实现了 ShouldBroadcast 接口分发广播事件会将其推送到 Laravel 当前使用的消息队列系统进行异步处理,如果实现了 ShouldBroadcastNow 接口则立即广播这个事件,如果没有实现这些接口就不是广播事件,按照普通事件类处理。

我们来看看广播事件分发的底层实现,和普通事件一样,最终也是通过 Illuminate\Events\Dispatcherdispatch 分发处理的,我们注意到其中包含这段广播事件处理代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if ($this->shouldBroadcast($payload)) {
    $this->broadcastEvent($payload[0]);
}

payload 是通过数组包裹的传入 dispatch 方法的事件实例数据,因此 payload[0] 也就是事件实例本身了,这里的 shouldBroadcast 方法用于判断当前事件是否需要广播,判断依据如下:

这个事件实例是否实现了 ShouldBroadcast 接口,以及如果事件类中定义了 broadcastWhen 方法,条件是否为 true(没有定义的话默认返回为 true),这两个条件同时满足才会广播,对应的实现源码位于 shouldBroadcast 方法中,非常简单,源码就不贴出来了。

如果需要广播,则调用 broadcastEvent 方法广播这个事件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected function broadcastEvent($event)
{
    $this->container->make(BroadcastFactory::class)->queue($event);
}

里面的这行代码最终会调用 Illuminate\Broadcasting\BroadcastManagerqueue 方法(相关服务容器绑定和别名设置位于 Illuminate\Broadcasting\BroadcastServiceProviderregister 方法中)进行广播事件的处理:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function queue($event)
{
    if ($event instanceof ShouldBroadcastNow) {
        return $this->app->make(BusDispatcherContract::class)->dispatchNow(new BroadcastEvent(clone $event));
    }

    $queue = null;

    if (method_exists($event, 'broadcastQueue')) {
        $queue = $event->broadcastQueue();
    } elseif (isset($event->broadcastQueue)) {
        $queue = $event->broadcastQueue;
    } elseif (isset($event->queue)) {
        $queue = $event->queue;
    }

    $this->app->make('queue')->connection($event->connection ?? null)->pushOn(
        $queue, new BroadcastEvent(clone $event)
    );
}
立即广播事件消息

如果广播事件实现了 ShouldBroadcastNow 接口,则通过 Illuminate\Bus\DispatcherdispatchNow 方法立即进行处理,最终执行的是 BroadcastEvent 实例的 handle 方法将其进行广播:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public function handle(Broadcaster $broadcaster)
{
    $name = method_exists($this->event, 'broadcastAs')
            ? $this->event->broadcastAs() : get_class($this->event);

    $broadcaster->broadcast(
        Arr::wrap($this->event->broadcastOn()), $name,
        $this->getPayloadFromEvent($this->event)
    );
}

事件名默认是类名,如果事件类定义了 broadcastAs 方法,则以其返回值作为事件名。接下来,就是调用 broadcaster->broadcast 方法广播这个事件了,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$this->app->singleton(BroadcasterContract::class, function ($app) {
    return $app->make(BroadcastManager::class)->connection();
});

也就是调用 BroadcastManager 实例的 connection 方法返回的广播驱动连接实例,我们这里的 BROADCAST_DRIVER 配置值为 redis,所以最终调用 createRedisDriver 返回 RedisBroadcaster 实例作为广播驱动实例。所以上面的 $broadcaster->broadcast 最终执行的是 RedisBroadcasterbroadcast 方法:

第一个参数是频道,以 UserSignedUp 事件为例,就是通过 broadcastOn 方法返回的 test-channel,频道参数不能为空,否则会退出,第二个参数是事件名,第三个参数是事件负荷数据,也就是基于 BroadcastEventgetPayloadFromEvent 处理后的事件消息数据,其中包含事件本身的属性数据。

broadcast 方法中,会将事件名和事件负荷数据一起封装到最终的 $payload 中,然后通过 Redis 连接,通过 PUBLISH 指令发布这个事件消息(在 broadcastMultipleChannelsScript 方法中包含了相应的 LUA 脚本)。

你看,经历这么多重重「关卡」,最终落地的还不是一个 Redis PUBLISH 指令。如果在 Websocket 服务器中通过 Redis 订阅了 test-channel 这个频道,就可以接收到这个消息,然后将其广播给所有建立连接的 Websocket 客户端了。

将事件消息推送到队列

不过细心的同学可能已经注意到 Illuminate\Events\DispatchershouldBroadcast 方法并没有针对是否实现 ShouldBroadcastNow 接口做判断,因此目前 Laravel 只是底层支持了立即广播事件消息,上层业务是不支持的,所以回到 Illuminate\Broadcasting\BroadcastManagerqueue 方法,我们继续往下看:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
$queue = null;

if (method_exists($event, 'broadcastQueue')) {
    $queue = $event->broadcastQueue();
} elseif (isset($event->broadcastQueue)) {
    $queue = $event->broadcastQueue;
} elseif (isset($event->queue)) {
    $queue = $event->queue;
}

$this->app->make('queue')->connection($event->connection ?? null)->pushOn(
    $queue, new BroadcastEvent(clone $event)
);

接下来,就是将事件消息推送到队列系统的操作了,首先获取队列名称,如果事件类定义了 broadcastQueue 方法,则将其返回值作为队列名称,否则使用事件实例上的 broadcastQueue 或者 queue 属性值作为队列名称,如果以上都没有设置,则只能使用默认的 default 作为队列名称了,这里我们设置了 broadcastQueue 属性,所以会被推送到 broadcast 这个队列。

最后,就是调用队列连接(根据当前配置,默认使用的是 Redis 连接,你也可以通过在事件类中设置 connection 属性指定其他队列连接)的 pushOn 方法推送封装了当前事件的 BroadcastEvent 实例到队列系统了,最终执行的就是位于 RedisQueue 中的 push 方法,我们前面介绍队列系统时已经详细介绍过这块的底层实现,这里就不再重复了。

基于前面事件监听和处理的底层实现分析,我们也可以预判,当启动队列处理器处理 broadcast 队列时,会按照上面立即广播事件消息的方式,基于 Illuminate\Bus\DispatcherdispatchNow 方法处理 BroadcastEvent,即执行其 handle 方法通过 RedisBroadcasterbroadcast 方法使用 Redis PUBLISH 指令发布消息。

所以虽然广播事件没有定义显式的事件监听器,但是底层其实是通过 BroadcastEvent 作为统一的广播事件监听器来处理所有广播事件的。所以啊,广播事件的处理是 Laravel 框架事件监听和消息队列的集大成者,了解它的底层实现,也就等于搞懂了所有这几个组件的实现原理。

本系列教程首发在学院君网站(xueyuanjun.com),你可以点击页面左下角阅读原文链接查看最新更新的教程。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 极客书房 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
基于 Redis 实现 Laravel 广播功能(下):在私有频道和存在频道发布和接收消息
在上面的示例广播事件 UserSignedUp 中,我们通过 Channel 定义了一个公共频道广播,即所有客户端都可以接收到这个事件消息:
学院君
2021/01/08
3.5K0
基于 Redis 消息队列实现 Laravel 事件监听及底层源码探究
在 Laravel 中,除了使用 dispatch 辅助函数通过 Illuminate\Bus\Dispatcher 显式推送队列任务外,还可以通过事件监听的方式隐式进行队列任务推送,在这个场景下,事件监听器实际上扮演了「任务类」的角色。
学院君
2021/01/08
3.7K0
【Laravel系列7.8】广播系统
广播系统指的是什么呢?在这里我们说的广播系统其实就是配合 WebSocket 实现的即时更新接口。什么意思呢?比如说在你的购物 App 上,如果订单状态发生了变化,比如卖家发货了,那么马上就会收到一条通知信息。当然,App 上使用的不是 WebSocket ,而是不同平台的推送机制,但它也是一种广播通知机制。如果你对 Redis 比较了解的话,也可以这么理解:它和 Redis 中的 Pub/Sub 也非常像,前端 SUBSCRIBE 监听频道,后端向频道里 PUBLISH 数据,就是这么个过程。
硬核项目经理
2023/03/03
2.6K0
【Laravel系列7.8】广播系统
Laravel 广播系统工作原理
今天,让我们深入研究下 Laravel 的广播系统。广播系统的目的是用于实现当服务端完成某种特定功能后向客户端推送消息的功能。本文我们将学习如何使用第三方 Pusher 工具向客户端推送消息的功能。
柳公子
2018/09/17
9.6K1
Laravel 广播系统工作原理
Laravel 事件处理(event)+ 队列使用(queue)
该脚本定义了一下优化参数,执行redis中名字为FMock的队列,不输出任何信息,3秒一次,失败后3秒重新执行,最多执行3次。
躺平程序员老修
2023/09/05
6800
基于 Pusher 驱动的 Laravel 事件广播(上)
说明:本文主要来源于Building Real-Time Laravel Apps with Pusher。
botkenni
2022/01/10
3.5K0
基于 Pusher 驱动的 Laravel 事件广播(上)
基于 Redis 在 Laravel 中实现消息队列及底层源码探究
对应的基本工作流程是生产者(业务代码)先将消息数据推送到队列,然后再通过其他的处理进程来消费队列中的消息数据,从而实现生产者和消费者之间的解耦。因此,消息队列非常适用于一些需要异步执行的耗时操作(比如邮件发送、文件上传),或者业务临时的高并发操作(比如秒杀、消息推送),对于提升系统性能和负载非常有效,尤其是 PHP 这种本身不支持并发编程的语言,是实现异步编程的不二之选。
学院君
2021/01/08
6.7K0
php-laravel Redis 广播
在很多现代 Web 应用中,WebSockets被用于实现实时更新的用户接口。当一些数据在服务器上
iwhao
2024/07/05
4830
Laravel学习教程之广播模块详解
前言 本文主要给大家介绍了关于Laravel广播模块的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: 注意:本文是基于Laravel 5.4版本的路由模块代码进行分析书写; 简介
用户2323866
2021/07/01
1.7K0
基于 Redis 实现 Laravel 广播功能(中):引入 Laravel Echo 接收广播消息
上篇教程我们完成了广播系统的后端配置和事件分发,并探究了底层源码的实现,最终落地的都是通过 Redis 发布命令发布消息。
学院君
2021/01/08
4.1K0
Laravel 广播
安装请移步 https://www.cuiwei.net/p/1659113677
崔哥
2023/03/24
2.8K0
Laravel 广播
Laravel结合Pusher实现数据实时推送
找到 app/Console/Commands/PusherEventCommand.php
素描
2021/11/12
5070
【Laravel系列7.5】事件系统
说到事件,你会想到什么?JS 中的回调函数,按扭的回调事件?没错,这些都是事件的应用。不过在 Laravel 中,事件是一种解耦机制,是 观察者 模式的一种体现。它能够允许你订阅和监听在你的应用中发生的各种事件。最典型的例子,当你操作完订单后,需要发送短信、邮件或者应用内通知的时候,我们一般就会使用观察者模式来实现。而事件,则是对这一操作的封装,非常方便好用。
硬核项目经理
2023/03/03
1.9K0
【Laravel系列7.5】事件系统
Laravel源码解析之事件系统
Laravel 的事件提供了一个简单的观察者实现,能够订阅和监听应用中发生的各种事件。事件机制是一种很好的应用解耦方式,因为一个事件可以拥有多个互不依赖的监听器。 laravel 中事件系统由两部分构成,一个是事件的名称,事件的名称可以是个字符串,例如 event.email,也可以是一个事件类,例如 App\Events\OrderShipped;另一个是事件的 监听器 listener,可以是一个闭包,还可以是监听类,例如 App\Listeners\SendShipmentNotification。
KevinYan
2019/10/13
1.2K0
Redis 分布式锁在 Laravel 任务调度底层实现中的应用
在 Laravel 项目中,我们可以基于任务调度功能非常轻松地管理 Crontab 定时任务,只需在 App\Console\Kernel 的 schedule 方法中定义所有需要调度的任务,类型包括 Artisan 命令、回调函数或者 Shell 脚本等:
学院君
2021/01/12
6.5K0
Redis 分布式锁在 Laravel 任务调度底层实现中的应用
基于 Redis 发布订阅 + Socket.io 实现事件消息广播功能
前面学院君给大家介绍了 Laravel 底层基于 Redis 列表驱动的消息队列实现原理,以及基于消息队列的事件监听和和处理,今天我们继续来看 Laravel 中另一个可以使用消息队列的场景 —— 事件广播,此外,我们还可以结合 Redis 发布/订阅功能完成广播系统的 Websocket 服务端实现。
学院君
2021/01/08
5K0
基于Model Event模型事件的Laravel实时APP
说明:本文主要来源于real-time-apps-laravel-5-1-event-broadcasting
botkenni
2022/01/10
6.1K0
基于Model Event模型事件的Laravel实时APP
Laravel核心概念:服务容器(ServiceContainer),服务提供者(Service Provider),门面(Facade),契约(Contracts)
学了两个多月的laravel一直没有去研究他的核心概念,在文档上看到些名词 “服务容器”,“服务提供者”...整个人人都是懵的下面结合我这几天的学习谈谈我的理解。
切图仔
2022/09/08
3K0
Laravel核心概念:服务容器(ServiceContainer),服务提供者(Service Provider),门面(Facade),契约(Contracts)
基于 Redis 消息队列实现邮件通知的异步发送
由于发送邮件、短信之类的操作通常涉及到第三方服务的调用,所以也是个响应时间不确定的耗时操作,如果放到处理用户请求进程中同步处理,需要等待很长时间才能获取响应结果,为了提升用户体验,可以让这些操作通过消息队列异步处理。
学院君
2021/01/22
3.3K0
PHP-web框架Laravel-队列(二)
在 Laravel 中,定义作业是通过实现 Illuminate\Contracts\Queue\Job 接口来完成的。这个接口定义了一个 fire 方法,用于处理作业的逻辑。下面是一个示例::
堕落飞鸟
2023/04/30
6810
推荐阅读
相关推荐
基于 Redis 实现 Laravel 广播功能(下):在私有频道和存在频道发布和接收消息
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档