Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >直播场景下-异步消息处理机制

直播场景下-异步消息处理机制

作者头像
吴文周
发布于 2023-09-01 03:43:14
发布于 2023-09-01 03:43:14
24100
代码可运行
举报
文章被收录于专栏:吴文周的专栏吴文周的专栏
运行总次数:0
代码可运行

抛出问题

  • js最大特色之一就是它的事件机制与回调处理,这一特色有利有弊,有利在于非阻塞性,有弊在与异步场景下面支持不太友好。
  • 快速进入主题在现实场景中往往需要同步处理或者串行处理,这个就有些为难了。
  • 场景一:直播过程中需要我们向服务端有序发送消息,且保证消息的发送达到。如果单纯的使用ajax请求不能保证请求的有序性,例如虽然发送的两条消息,先是1+1=?,然后发送答案是2,由于网络请求的不确定性,可能到达服务端或者其他端出现先接受到答案是2,再收到1+1=?,这样的结果显然是不对的。
  • 场景二:直播过程中获取服务端推送的消息,按照时间区块切割统一处理,一个时间段内接受到的消息统一绘制列表,如果一有消息就处理渲染性能就会受到影响,合并处理是提高性能的一种方式。
  • 场景三:直播锁定,房间切换或者其他切换行为都是都是直播细粒化的场景,不能继续执行之前的逻辑,需要暂停操作,还有断网重试等等。
  • 核心观点-所有脱离业务场景的技术讨论都是耍流氓,我们当前讨论就是在直播业务过程中或者需要异步消息串行处理的场景。

常见的解决方案

  • 可以查看这位大神的总结 链接js异步编程
  • 总结callback -> promise -> generator -> async + await,这样一些解决方案

方案分析

  • 虽然我们可以使用例如promise或者其他的方式处理异步请求,在固定请求的场景下面是很容易解决的,例如三个请求控制顺序,控制返回,这里不再赘述。但是实时数据发送与三个固定请求的场景相比还是要复杂很多的。
  • 我们需要失败重试,消息先进先出,上一个处理完成,才能继续处理下一个。还需要消息缓存,一次性处理多条数据的渲染等等。
  • 哪怕使用async + await 也会使得我们代码结构相对复杂,不能抽象重用。
  • 那么究竟该如何实现呢?

实现思路

  • 核心思路一:消息有序,使用队列设计实现先进先出。统一的数据管理可以实现,可追溯,可管理,可查看。
  • 核心思路二:消息需要生产,需要确认消费,如果消息还没有被消费(在向服务端发送请求的过程中,或者返回失败),消息需要一直存在,只有向服务端发送成功,消息才能被移除出队列
  • 核心思路三:消息流程控制,需要设定重试次数,向服务端发送请求,如果失败,可以重试几次,保证消息有序,正常。需要控制消息接收处理的时间窗口,不仅仅有接收到服务端的消息,还有自己发送的消息,在一个时间窗口内统一绘制dom列表,防止多次渲染,影响性能,这里使用了第三方的库rxjs(好处不用多说,封装好的api,可以取消等等)正是使用了rxjs借助其提供的api能力可以很好的实现取消订阅,暂停操作,断网重试等等。
  • 核心思路四:链路闭环,消息生产-进入队列缓存-消息消费-消费确认-继续消费。使用什么才能使得这一切闭环呢?答案是观察者模式,其实我们只要订阅队列数据的变化,当数据发生变化的时候,我们就开始消费队列中的数据,数据发送成功到达服务端,确认消费,更新队列数据(即删除最先进入的数据),然后继续下面的操作。看着这个图还用想吗?当然使用proxy做了。

纸上得来终觉浅,绝知此事要躬行

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @name: LiveMQ
 * @msg: 消息基础类,实现队列功能
 */
class LiveMQ {
  public queue: Array<any>;// 队列数据
  public callback: (message) => void;// 接受到消息,处理回调函数
  public handler = {};// proxy的 handler 为了数据劫持
  constructor() {}
  //入队
  public enqueue() {
    var len = arguments.length;
    if (len == 0) {
      return;
    }
    for (var i = 0; i < len; i++) {
      this.queue.push(arguments[i]);
    }
  }
  //出队
  dequeue() {
    var result = this.queue[0];
    return typeof result != "undefined" ? result : new Error("error");
  }
  // 确认消费
  confirm() {
    this.queue.splice(0, 1);
  }
  //队列是否为空
  isEmpty() {
    return this.queue.length === 0;
  }
  //返回队列长度
  size() {
    return this.queue.length;
  }
  //清空队列
  clear() {
    this.queue = new Proxy([], this.handler);
  }
  //返回队列
  show() {
    return this.queue;
  }
}
  • 开始第二步实现消息队列有序消费(可以用来向服务端发送不同的消息,或者接受消息绘制dom)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @name: LiveHandleMQ
 * @msg: 有序消息队列处理
 */
class LiveHandleMQ extends LiveMQ {
  private lock = false;// 处理消息过程中加锁,处理结束解锁
  private retry: number;// 重试此次
  private observer: any;// 观察者
  private subscription: any;// 订阅者
  public handler = {
    set: (target, key, value, receiver) => {
       // 队列长度变化时候触发消费数据
      if (!this.lock && value > 0 && key == "length") {
        this.subscribe();
      }
      return Reflect.set(target, key, value, receiver);
    },
  };
  constructor(callback: (arg) => void, retry: number = 0) {
    super();
    // 重试次数合法性校验
    if (retry % 1 === 0 && retry >= 0) {
      this.callback = callback;
      this.retry = retry;
       // 使用Proxy 劫持队列数据变化
      this.queue = new Proxy([], this.handler);
    } else {
      console.error("retry is not legitimate");
    }
  }
  private subscribe() {
    this.lock = true;
    this.observer = window["Rx"].Observable.create(async (observer) => {
      try {
        await this.callback(this.dequeue());
        observer.next("");
        observer.complete();
      } catch (error) {
        console.log("出错了重试");
        observer.error(error);
      }
    }).retry(this.retry);
    this.subscription = this.observer.subscribe({
      next: () => {
        this.next();
      },
      error: () => {
        this.next();
      },
    });
  }
  /**
   * @name: next
   * @msg: 下一步调用
   */
  private next() {
    // 确认消费
    this.confirm();
    // 队列中是否还有其他数据需要消费,如果有数据继续消费,如果没有解锁
    if (!this.isEmpty()) {
      this.subscribe();
    } else {
      this.lock = false;
    }
  }
  /**
   * @name: destroy
   * @msg: 清除订阅
   */
  destroy() {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
  }
}
  • 开始第三步时间区间收集本地消息,服务端消息,生产的消息进入队列中等待处理
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * @name: LiveCollectionMQ
 * @msg: 区间数据采集队列缓存
 */
class LiveCollectionMQ extends LiveMQ {
  private emitter = window["mitt"]();// 内部事件
  private bufferTime: number;// 采集数据时间区间
  private observer: any;
  private subscription: any;
  private mq: any;// 消息处理者
  public handler = {
    set: (target, key, value, receiver) => {
       // 监听队列中的每个数据变化
      if (!isNaN(Number(key))) {
        this.emitter.emit("notify", value);
      }
      return Reflect.set(target, key, value, receiver);
    },
  };
  constructor(callback: (arg) => void, bufferTime: number = 1000) {
    super();
    if (bufferTime % 1 === 0 && bufferTime > 0) {
      const _this = this;
      this.mq = new LiveHandleMQ(callback);
      this.bufferTime = bufferTime;
      this.queue = new Proxy([], this.handler);
      // 订阅内部事件数据
      this.observer = window["Rx"].Observable.fromEventPattern(
        function addHandler(h) {
          _this.emitter.on("notify", h);
        },
        function delHandler(h) {
          _this.emitter.off("notify", h);
        }
      );
      this.subscription = this.observer
        .bufferTime(_this.bufferTime)
        .subscribe((messages) => {
          if (messages.length > 0) {
            this.mq.enqueue(messages);
          }
        });
    } else {
      console.error("bufferTime is not legitimate");
    }
  }
  /**
   * @name: destroy
   * @msg: 清除订阅
   */
  destroy() {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
    this.mq.destroy();
  }
}
  • 跑个单元测试,ts 生产live.js 命令tsc
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!DOCTYPE html>
<html lang="en">
  <body>
    <button type="button" id="xxx">点我发消息</button>
    <script src="https://unpkg.com/@reactivex/rxjs@5.0.0-beta.1/dist/global/Rx.umd.js"></script>
    <script src="https://unpkg.com/mitt/dist/mitt.umd.js"></script>
    <script src="live.js"></script>
    <script>
     // 异步处理函数
      function test(mes, observer) {
        return new Promise((resolve, reject) => {
          let time = Math.ceil(Math.random() * 10000);
          console.log("time", time, mes);
          setTimeout(() => {
            if (false) {
              resolve();
            } else {
              reject();
            }
          }, time);
        });
      }
      // 单纯的执行函数
      function test1(mes) {
        console.log(mes);
      }
      var count = 0;
      // var queue = new LiveHandleMQ(test, 3);
      // 实例化对象
      var queue = new LiveCollectionMQ(test, 10000);
      document.getElementById("xxx").addEventListener("click", function () {
        count++;
        // 数据进入队里
        queue.enqueue(count);
        if (count > 10) {
          //提供声明周期的销毁函数
          queue.destroy();
        }
      });
    </script>
  </body>
</html>

总结

  • 对rxjs的使用还是比较浅薄的,在这个场景下面rx是不是更大的发展空间是未知的也是自己需要不断学习的
  • 编写可维护的代码就是,代码逻辑清晰,代码方法高可用,可迁移。
  • 最后祝大家牛年大吉,加油,加油,加油!!!
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-02-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
(新年快乐)直播场景下-异步消息处理机制
.markdown-body{word-break:break-word;line-height:1.75;font-weight:400;font-size:15px;overflow-x:hidden;color:#333}.markdown-body h1,.markdown-body h2,.markdown-body h3,.markdown-body h4,.markdown-body h5,.markdown-body h6{line-height:1.5;margin-top:35px;margin-bottom:10px;padding-bottom:5px}.markdown-body h1{font-size:30px;margin-bottom:5px}.markdown-body h2{padding-bottom:12px;font-size:24px;border-bottom:1px solid #ececec}.markdown-body h3{font-size:18px;padding-bottom:0}.markdown-body h4{font-size:16px}.markdown-body h5{font-size:15px}.markdown-body h6{margin-top:5px}.markdown-body p{line-height:inherit;margin-top:22px;margin-bottom:22px}.markdown-body img{max-width:100%}.markdown-body hr{border:none;border-top:1px solid #ddd;margin-top:32px;margin-bottom:32px}.markdown-body code{word-break:break-word;border-radius:2px;overflow-x:auto;background-color:#fff5f5;color:#ff502c;font-size:.87em;padding:.065em .4em}.markdown-body code,.markdown-body pre{font-family:Menlo,Monaco,Consolas,Courier New,monospace}.markdown-body pre{overflow:auto;position:relative;line-height:1.75}.markdown-body pre>code{font-size:12px;padding:15px 12px;margin:0;word-break:normal;display:block;overflow-x:auto;color:#333;background:#f8f8f8}.markdown-body a{text-decoration:none;color:#0269c8;border-bottom:1px solid #d1e9ff}.markdown-body a:active,.markdown-body a:hover{color:#275b8c}.markdown-body table{display:inline-block!important;font-size:12px;width:auto;max-width:100%;overflow:auto;border:1px solid #f6f6f6}.markdown-body thead{background:#f6f6f6;color:#000;text-align:left}.markdown-body tr:nth-child(2n){background-color:#fcfcfc}.markdown-body td,.markdown-body th{padding:12px 7px;line-height:24px}.markdown-body td{min-width:120px}.markdown-body blockquote{color:#666;padding:1px 23px;margin:22px 0;border-left:4px solid #cbcbcb;background-color:#f8f8f8}.markdown-body blockquote:after{display:block;content:""}.markdown-body blockquote>p{margin:10px 0}.markdown-body ol,.markdown-body ul{padding-left:28px}.markdown-body ol li,.markdown-body
吴文周
2021/02/12
8350
(新年快乐)直播场景下-异步消息处理机制
Android异步消息处理机制完全解析,带你从源码的角度彻底理解
之前也是由于周末通宵看TI3比赛,一直没找到时间写博客,导致已经有好久没更新了。惭愧!后面还会恢复进度,尽量保证每周都写吧。这里也是先恭喜一下来自瑞典的Alliance战队夺得了TI3的冠军,希望明年
用户1158055
2018/01/05
8410
Android异步消息处理机制完全解析,带你从源码的角度彻底理解
Android Handler 消息处理机制
日常开发中,一般不会在子线程中直接进行 UI 操作,大部分采取的办法是创建 Message 对象,然后借助 Handler 发送出去,再在 Handler 的 handlerMessage() 方法中获取 Message 对象,进行一系列的 UI 操作。Handler 负责发送 Message, 又负责处理 Message, 其中经历了什么 ,需要从源码中一探究竟。
用户3596197
2018/10/15
5100
Angular快速学习笔记(4) -- Observable与RxJS
介绍RxJS前,先介绍Observable 可观察对象(Observable) 可观察对象支持在应用中的发布者和订阅者之间传递消息。 可观察对象可以发送多个任意类型的值 —— 字面量、消息、事件。 基本用法和词汇 作为发布者,你创建一个 Observable 的实例,其中定义了一个订阅者(subscriber)函数。 当有消费者调用 subscribe() 方法时,这个函数就会执行。 订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。 要执行所创建的可观察对象,并开始从中接收通知,你就要调用它的 s
JadePeng
2018/05/28
6K0
笔记:安卓App消息处理机制
内容简述 类似Binder机制,MessageQueue、Looper也有底层的C++实现,涉及文件管道和驱动等。 以下仅从Java层的Looper、Handler和MessageQueue等相关类
用户1172465
2018/01/08
1.2K0
Android异步消息处理机制完全解析-Handler详解
为了更直观的看到报错原因,我们找到源码ViewRootImpl的checkThread方法,看它做了些什么。
Javen
2018/08/21
9110
Android异步消息处理机制完全解析-Handler详解
继续解惑,异步处理 —— RxJS Observable
接上一篇《Js 异步处理演进,Callback=>Promise=>Observer》,可能不少掘友对 Observer 还心存疑虑,本篇继续解惑~
掘金安东尼
2022/09/19
1.3K0
继续解惑,异步处理 —— RxJS Observable
Handler消息处理机制详解
Handler封装了消息的发送,也负责接收消。通过post方法和sendMessage发送消息。内部会跟Looper关联。
用户7557625
2020/07/15
5410
Android消息处理机制
Google参考了Windows的消息处理机制,在Android系统中实现了一套类似的消息处理机制。学习Android的消息处理机制,有几个概念(类)必须了解:
全栈程序员站长
2022/07/20
4810
Android消息处理机制
RxJava2 解析
通过ObservableEmitter 被观察者的发射器,做发送数据、错误、完成等操作,是一个接口,继承自Emitter。
Yif
2019/12/26
1.3K0
深入浅出 RxJS 之 创建数据流
所谓创建类操作符,就是一些能够创造出一个 Observable 对象的方法,所谓“创造”,并不只是说返回一个 Observable 对象,因为任何一个操作符都会返回 Observable 对象,这里所说的创造,是指这些操作符不依赖于其他 Observable 对象,这些操作符可以凭空或者根据其他数据源创造出一个 Observable 对象。
Cellinlab
2023/05/17
2.5K0
深入浅出 RxJS 之 创建数据流
用 C 语言操作 Kafka :基于 librdkafka 的开发指南
librdkafka 是一个高性能的 C/C++ Kafka 客户端库,提供了可靠的消息传递机制和丰富的功能。要使用 librdkafka,需要将其安装到系统中。
Lion 莱恩呀
2025/04/25
2370
用 C 语言操作 Kafka :基于 librdkafka 的开发指南
RxJava的消息发送和线程切换
RxJava相信大家都非常了解吧,今天分享一下RxJava的消息发送和线程源码的分析。最后并分享一个相关demo,让大家更加熟悉我们天天都在用的框架。
HelloJack
2018/12/05
9290
RxJava的消息发送和线程切换
【MQ05】异常消息处理
上节课我们已经学习到了消息的持久化和确认相关的内容。但是,光有这些还不行,如果我们的消费者出现问题了,无法确认,或者直接报错产生异常了,这些消息要怎么处理呢?直接丢弃?这就是丢消息了呀。再次处理?一直继续报错怎么办?这条消息就永远都在不停报错的死循环中了。
硬核项目经理
2024/02/29
2820
【MQ05】异常消息处理
RxJS 入门到搬砖 之 Scheduler
什么是 Scheduler ? scheduler 控制 subscription 什么时候开始和通知什么时候派发。
Cellinlab
2023/05/17
6340
[译]Rxjs&Angular-退订可观察对象的n种方式
在angular项目中我们不可避免的要使用RxJS可观察对象(Observables)来进行订阅(Subscribe)和退订(Unsubscribe)操作;
laggage
2021/02/05
1.6K0
[译]Rxjs&Angular-退订可观察对象的n种方式
锦囊篇|一文摸懂RxJava
(1)包结构变化RxJava 3 components are located under the io.reactivex.rxjava3 package (RxJava 1 has rx and RxJava 2 is just io.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable, Observer, etc.) have been moved to io.reactivex.rxjava3.core.为了阅读障碍的朋友们给出我的一份四级水准翻译,有以下的几点变化:
ClericYi
2020/06/23
8510
RxJS Subject
观察者模式,它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。
阿宝哥
2019/11/06
2.3K0
探究Android异步消息的处理之Handler详解
在学习Android的路上,大家肯定会遇到异步消息处理,Android提供给我们一个类来处理相关的问题,那就是Handler。相信大家大多都用过Handler了,下面我们就来看看Handler最简单的用法:
俞其荣
2022/07/28
5610
RxJs简介
这两年,各种异步编程框架,上面RxJava,RxAndroid,RxSwift等等,今天要聊的是RxJs,对于我等入门不久的前端工程师来说,这个框架还是比较有新颖的,中文官网地址:http://cn.rx.js.org/
xiangzhihong
2022/11/30
4K0
推荐阅读
相关推荐
(新年快乐)直播场景下-异步消息处理机制
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验