前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rx.js 入门笔记

Rx.js 入门笔记

作者头像
copy_left
发布2019-08-21 16:44:09
2.9K0
发布2019-08-21 16:44:09
举报
文章被收录于专栏:方球

基本概念

  • Observable 可观察者, 生产数据
  • Observer 观察者, 消费数据
  • Subscription 订阅/可清理对象, 用以清理资源或中断Observeable执行
  • Subject 多播主体, 向多个订阅者广播数据
  • Operators 操作符, 处理数据的函数
数据获取方式, 推送/拉取
  • 数据的获取方式,表示了数据生产者和数据消费者之间的通信关系
  • 拉取: 由消费者控制何时获取数据, 例如:请求状态管理器中的状态指
  • 推送: 有生产者控制何时获取数据, 例如:向服务器请求数据
可观察者 Observable
  • 基础创建
代码语言:javascript
复制
import { Observable } from 'rxjs';

const ob = Observable.create(observer =>{
    // 定义观察者操作
    
    // next 推送数据
    observer.next(1)
    
    // complete 完成后台执行
    observer.complete();
    
    // unsubscrit 自定义中断订阅
    return () =>{....}
    
})
  • 其他创建方法, of, from, fromEvent, fromPromise, interval, range 等API
  • 订阅 subscribe() 当可观察者未被订阅时,将不会被执行
代码语言:javascript
复制
observable.subscribe( data => {
    ...
    执行数据操作
} )
  • 执行
  • next: 推送通知
  • error: 异常通知
  • complete: 完成通知
代码语言:javascript
复制
import { Observable } from 'rxjs';

const ob = Observable.create(observer =>{
    try{    
        observer.next(1)
        observer.complete();
    }catch(err) {
        observer.error(err);   
    }
})
观察者
  • 观察者定义了如何处理数据或错误
  • 观察者可配置三种数据处理方法
    • 'next':正常处理
    • 'error': 错误处理
    • 'complete': 完成处理
代码语言:javascript
复制
const observer = {
    next: data => console.log(data),
    error: err => console.error(err),
    complete: () => console.log('end')
}

// 执行订阅
observable.subscribe(observer);
订阅 Subscription
  • 提供清理数据,取消Observable执行, 取消订阅
代码语言:javascript
复制
const subscription = observable.subscribe(data => {....});

subscription.unsubscribe();
多播 Subject
  • 提供向多个订阅,发送通知的能力
  • subject 本身是观察者, 可以作为Observable 参数
代码语言:javascript
复制
// 创建对象
import { Subject } from 'rx.js';
const subject = new subject();

// 订阅
const A = subject.subscribe(data => console.log('a', data);

const B = subject.subscribe({
    next: data => console.log('b', data),
    error: err => console.error(err),
    complete: () => console.log('end')
    
});

// 发送通知
subject.nect(1);

// 解除订阅
A.unsubscribe();
  • 作为参数
代码语言:javascript
复制
const subject = new Subject();
const observable = Observable.from([1, 2]);

subject.subscribe(data =>{
    console.log(data)
})

// 执行订阅
observable.subscribe(subject);

>>> 1
>>> 2
  • multicast
    • 多播Observable 底层使用该操作符, 实现对多个订阅的通知
    • 通过该操作符,可以控制推送的时机
代码语言:javascript
复制
// 官方例子

// 创建Observable
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 绑定订阅, 此时调用的是 subject.subscribe(), 所以并不会推送通知
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 开始执行, 在底层使用了 `source.subscribe(subject)`:
multicasted.conne
  • 多播变体
  • BehaviorSubject : 缓存当前已发送值
  • ReplaySubject : 记录历史值, 缓存以当前值向前某几位值, 或某段时间前的值
  • AsyncSubject :全体完成后,再发送通知
操作符
  • 声明式的函数调用(FP), 不修改原Observable, 而是返回新的Observable
  • 实例操作符: Observable 实例方法, 例如: multiplyByTen
  • 静态操作符: Observable 类方法 例如: of from interval
  • (操作符分类)[https://cn.rx.js.org/manual/overview.html#h15]

常用操作符

创建
  • of: 发送配置参数
代码语言:javascript
复制
import { of } from 'rxjs';

const data$ = of({id:1}, {id:2});

data$.subscribe(data => console.log(data));

// print
{id:1} ---- {id:2}
  • from: 输出可遍历对象子项
代码语言:javascript
复制
import { from } from 'rxjs';

const data$ = from([1, 2, 3]);
data$.subscribe(data => console.log(data));


// print
1 ---- 2 ---- 3
  • fromEvent: 绑定事件
代码语言:javascript
复制
const ele = document.getElementsById('ele');
const event$ = fromEvent(ele, 'click');

event$.subscribe(event => {
    console.log(evnet.target)
})

// 事件触发时,发出事件流
  • interval: 间隔发送(计时器)
代码语言:javascript
复制
const interval$ = interval(100);
interval$.subscribe( num => console.log(num) )
    
  100ms  100ms  100ms
0 ---- 1 ---- 2 ---- 3 ....
  • timer: 延时发送
代码语言:javascript
复制
// 初次延时
const timer$ = timer(100); 
timer$.subscribe(data => console.log(data));
console.log('run');

// print 
'run'
0

// 延时后,间隔发送
const timer$ = timer(100, 1000); // 100: 延时, 1000: 发送间隔 
timer$.subscribe(data => console.log(data));
console.log('run');

// print 
'run'
// 100ms
0
// 1000ms
1
.....
  • empty: 发送空信息
代码语言:javascript
复制
empty().subscribe(data => console.log(data);

// 无数据输出
  • never: 不发送信息
  • doc
代码语言:javascript
复制
never.subscribe(data => console.log(data));

// 无数据输出
  • trow: 发送错误
代码语言:javascript
复制
throw('This is error').subscribe({
    next: log,
    error: log,
    complete: () => log('complete')
})

// print
'This is error'
  • range: 发送指定队列
代码语言:javascript
复制
// range(star, len) start: 起始值, len: 数据长度
range(2, 2).subscribe(num => console.log(num));

// print
2 ---- 3
转换符
  • map: 函数应用于每个数据源
代码语言:javascript
复制
from([1, 2, 3]).map(item => item++).subscribe(end => console.log(end)

// print
2 ---- 3 ---- 4
  • mapTo: 使用配置指,替换源数据
代码语言:javascript
复制
from([1, 2, 3]).mapTo('value').subscribe(end => console.log(end));

// print
value ---- value ---- value
  • mergeMap: 拍平数据, 返回新Observable
  • doc
代码语言:javascript
复制
// 提取对象内数组数据,并转换为单一数据向外发送
const obj$ = of({arr: [1, 2, 3]});

obj$.pluck('arr')
    .mergeMap(arr => from(arr) ) // 拍平数据
    .subscribe(num => console.log(num));
    
// print

1 ---- 2 ---- 3 
// 这里将数组拆解,作为单一项目向外发送
  • mergeMapTo: 拍平数据, 使用配置指替代源指, 类似 map与mapTo的关系
代码语言:javascript
复制
const obj$ = of({arr: [1, 2, 3]});

obj$.pluck('arr')
    .mergeMapTo('str') // 拍平数据
    .subscribe(num => console.log(num));

// print
str ---- str ---- str
  • pluck;取出指定属性值
代码语言:javascript
复制
const obj$ = of([{nickname: 'coco', age:  12}, {nickname: 'jeck', age: 23} ]);

obj$.pluck(0, 'nickname').subscribe(...);

// print
'coco'
  • concatMap: 合并流,前一个流将作为后一个流的处罚机制
  • doc
代码语言:javascript
复制
const prefix$ = from(['hot', 'remind']);
const next$ = prefix$.concatMap( pre => form(['news', 'info']).map(item => pre +' '+next ) ).subscribe(...)

// print

'hot' ------------------------ 'remind'
'hot news'--- 'hot info' ----- 'remind news' --- 'remind info'

/*
** 后续Observable 可以操作前一个Oberservable发出的数据流,
** 也可以只发送自己的数据留,前一个留只作为触发机制
  • concatMapTo: 类似 map 与 mapTo , 替换源数据值
  • scan: 记录上次回调执行结果
  • doc
代码语言:javascript
复制
// 第一参数为执行回调, 第二参数为初始值
from([1, 2, 3]).scan((a, b) => a+b, 0).subscriba(...)

// print
1 ---- 2 ---- 3
1+0    1+2    3+3
1 ---- 3 ---- 6

// 其他特殊操作
from([1, 2]).scan((a, b) => [...a, b], []);

// print
[1] --- [1, 2]

// 使用数组记录每次发送的值
  • repeat; 重复发送流
  • doc
代码语言:javascript
复制
const num$ = of(1);
num$.repeat(2).subscribe(num => console.log(num) );

// print 
1 ---- 1
  • margeScan: 类似数据流经过scan后在经过 margeMap 处理
代码语言:javascript
复制
// 需要赋初始值,否则结果为NaN, (undefined + number)
form([1, 2]).margeScan( (a, b) => of( a + b), 0 ).subscribe(...)


// print 
1 --- 3 ---- 6
过滤
  • debounceTime: 上游停止发送一段时间后,将最新值发出
代码语言:javascript
复制
from([1, 2, 3]).debounceTime(1000).subscribe(...)

// print
3
  • defultIfEmpty: 上有完成未发出数据,将使用默认值
代码语言:javascript
复制
empty().defultIfEmpty(null).subscribe(...);

// print
null
  • delay 向后延时发送
代码语言:javascript
复制
of(1).delay(1000).subscribe(...)
console.log('after');

// print

'after'
1
  • delayWhen 延时至下游Obersevabel发送数据时,再将数据向下流
代码语言:javascript
复制
of(1).delayWhen( data => interval(1000) ).subscribe(...)

// print
1000ms
------- 1
  • do 不中断流的情况下执行自定义回调
代码语言:javascript
复制
form([1, 2, 3]).do({
 next: num => console.log(num),
 error: error => console.log(error),
 complete: () => console.log('complete')
}).subscribe(...)

// print 
11 ---- 12 ---- 13
  • elementAt 只发送某一次数据
代码语言:javascript
复制
interval(500).elementAt(2).subscribe(...);

// print
2
  • filter 发送符合条件数据
代码语言:javascript
复制
// 过滤偶数
interval(1000).filter( num => num%2 === 0 ).subscribe(...);

// print
0 ---- 2 ---- 4 ....
  • find 发送第一个符合条件数据
代码语言:javascript
复制
interval(1000).find(num => num === 2).subscribe(...);

// print
2
  • findIndx 发送第一个符合条件数据的编号
代码语言:javascript
复制
from([1, 11, 13]).findIndex( num => num > 10 ).subscribe(...);

// print
1
  • first 发送第一个值
代码语言:javascript
复制
interval(100).first().subscribe(...)

// print
0
  • last 发送最后一个指
代码语言:javascript
复制
from([1, 2, 3]).last().subscribe(...)

// print
3
  • every 验证数据每一项都否符合要求, 返回布尔值
代码语言:javascript
复制
range(0, 3).every(num < 3).subscribe(...);

// print
true

// 完成时,返回最终值
  • isEmpty 验证数据是否为空
代码语言:javascript
复制
empty().isEmpty().subscribe(...);

// print
true
  • max 通过比较函数,返回最大值
  • min 通过比较函数, 返回最小值
代码语言:javascript
复制
// 通过自定义函数做判断
from(['coco', 'py', 'nobody']).max((a, b) =>  a.length > b.length ? 1 : -1 ).subscribe(...);


// print
'nobody'
  • take 只发送前n个数据
代码语言:javascript
复制
interval(1000).take(2).subscribe(...)

// print
0 ---- 1
  • takeLast 只发送最后n个数据, 完成后一同发出
代码语言:javascript
复制
range(0, 10).takelast(2).subscribe(...);

// print
9 ---- 10
  • takeUntil 发送数据直到下游Oberservable开始发送数据
代码语言:javascript
复制
interval(500).takeUnitl( of('down').delay(1000) ).subscrivbe(...)

// print
0
  • takeWhile 当条件不满足时终止
代码语言:javascript
复制
interval(100).takeWhile( num => num < 3 ).subscribe(...)

// print
0 --- 1 -- 2
组合
  • switch: 当上游发出数据时,将新开一个下游Obsevable, 并中断前一下游数据流
  • doc
代码语言:javascript
复制
interval(1000).switchMap(pre => interval(300)).subscribe(...);

// print
0 -------------- 1 ----------- 
0 --- 1 --- 2 --- 0 --- 1 --- 2

// 需要注意的是当上游发送频率大于下游时,下游将无法正常发送数据.
  • concat 合并多个不同的流,按先后顺序输出
代码语言:javascript
复制
const a$ = range(0, 3)
const b$ = range(10, 3)

a$.contact(b$).subscribe(...);

// print
0 --- 1 --- 2 --- 10 --- 11 --- 12
  • concat 按顺序执行订阅,只有当一个内部Observable后再执行下一个Observable
代码语言:javascript
复制
range(0, 3)
.do(num => console.log(num)
.map(num => of('next'))
.concatAll()
.subscribe(...)

// print

0 --- next --- 1 --- next --- 2 --- next

/*
** 这里将每个上游值转化为Obervable, 当上游执行完
** 将调用下游值,将数据合并到同一流中
*/
  • merge 合并多个流,拍平数据
代码语言:javascript
复制
const first$ = interva(500).mapTo('first');
const secend$ = interva(500).mapTo('secend');

first$.merge(secend$).subscribe(...)

// print 
'first' --- 'secend' --- 'first' --- 'secend'
  • zip 将多个流数据合并,依次发出
代码语言:javascript
复制
const a$ = range(0, 4);
const b$ = from(['coco', 'jeck', 'mike']);
const c$ = of(true, true, false);

Observable.zip( a$, b$, c$ ).subscribe(...);

// print

[ 0, 'coco', true ] ---- [ 1, 'jeck', true ] ---- [ 2, 'mike', false ]

/*
** 注意;只有当所有子流同次,都有数据发送时,才能获取最终数据 
** 上面例子中 a$ 将多发送一次数据,当最终不会被输出
*/
错误处理
  • catch 捕获错误,返回新的Observable 或 error
  • retry 重试Observable, 达到次数后终止
  • retryWhen
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.07.29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本概念
    • 数据获取方式, 推送/拉取
      • 可观察者 Observable
        • 观察者
          • 订阅 Subscription
            • 多播 Subject
              • 操作符
              • 常用操作符
                • 创建
                  • 转换符
                    • 过滤
                      • 错误处理
                      相关产品与服务
                      云服务器
                      云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档