import { Observable } from 'rxjs';
const ob = Observable.create(observer =>{
// 定义观察者操作
// next 推送数据
observer.next(1)
// complete 完成后台执行
observer.complete();
// unsubscrit 自定义中断订阅
return () =>{....}
})
当可观察者未被订阅时,将不会被执行
observable.subscribe( data => {
...
执行数据操作
} )
next
: 推送通知error
: 异常通知complete
: 完成通知import { Observable } from 'rxjs';
const ob = Observable.create(observer =>{
try{
observer.next(1)
observer.complete();
}catch(err) {
observer.error(err);
}
})
const observer = {
next: data => console.log(data),
error: err => console.error(err),
complete: () => console.log('end')
}
// 执行订阅
observable.subscribe(observer);
const subscription = observable.subscribe(data => {....});
subscription.unsubscribe();
// 创建对象
import { Subject } from 'rx.js';
const subject = new subject();
// 订阅
const A = subject.subscribe(data => console.log('a', data);
const B = subject.subscribe({
next: data => console.log('b', data),
error: err => console.error(err),
complete: () => console.log('end')
});
// 发送通知
subject.nect(1);
// 解除订阅
A.unsubscribe();
const subject = new Subject();
const observable = Observable.from([1, 2]);
subject.subscribe(data =>{
console.log(data)
})
// 执行订阅
observable.subscribe(subject);
>>> 1
>>> 2
// 官方例子
// 创建Observable
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// 绑定订阅, 此时调用的是 subject.subscribe(), 所以并不会推送通知
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// 开始执行, 在底层使用了 `source.subscribe(subject)`:
multicasted.conne
import { of } from 'rxjs';
const data$ = of({id:1}, {id:2});
data$.subscribe(data => console.log(data));
// print
{id:1} ---- {id:2}
import { from } from 'rxjs';
const data$ = from([1, 2, 3]);
data$.subscribe(data => console.log(data));
// print
1 ---- 2 ---- 3
const ele = document.getElementsById('ele');
const event$ = fromEvent(ele, 'click');
event$.subscribe(event => {
console.log(evnet.target)
})
// 事件触发时,发出事件流
const interval$ = interval(100);
interval$.subscribe( num => console.log(num) )
100ms 100ms 100ms
0 ---- 1 ---- 2 ---- 3 ....
// 初次延时
const timer$ = timer(100);
timer$.subscribe(data => console.log(data));
console.log('run');
// print
'run'
0
// 延时后,间隔发送
const timer$ = timer(100, 1000); // 100: 延时, 1000: 发送间隔
timer$.subscribe(data => console.log(data));
console.log('run');
// print
'run'
// 100ms
0
// 1000ms
1
.....
empty().subscribe(data => console.log(data);
// 无数据输出
never.subscribe(data => console.log(data));
// 无数据输出
throw('This is error').subscribe({
next: log,
error: log,
complete: () => log('complete')
})
// print
'This is error'
// range(star, len) start: 起始值, len: 数据长度
range(2, 2).subscribe(num => console.log(num));
// print
2 ---- 3
from([1, 2, 3]).map(item => item++).subscribe(end => console.log(end)
// print
2 ---- 3 ---- 4
from([1, 2, 3]).mapTo('value').subscribe(end => console.log(end));
// print
value ---- value ---- value
// 提取对象内数组数据,并转换为单一数据向外发送
const obj$ = of({arr: [1, 2, 3]});
obj$.pluck('arr')
.mergeMap(arr => from(arr) ) // 拍平数据
.subscribe(num => console.log(num));
// print
1 ---- 2 ---- 3
// 这里将数组拆解,作为单一项目向外发送
const obj$ = of({arr: [1, 2, 3]});
obj$.pluck('arr')
.mergeMapTo('str') // 拍平数据
.subscribe(num => console.log(num));
// print
str ---- str ---- str
const obj$ = of([{nickname: 'coco', age: 12}, {nickname: 'jeck', age: 23} ]);
obj$.pluck(0, 'nickname').subscribe(...);
// print
'coco'
const prefix$ = from(['hot', 'remind']);
const next$ = prefix$.concatMap( pre => form(['news', 'info']).map(item => pre +' '+next ) ).subscribe(...)
// print
'hot' ------------------------ 'remind'
'hot news'--- 'hot info' ----- 'remind news' --- 'remind info'
/*
** 后续Observable 可以操作前一个Oberservable发出的数据流,
** 也可以只发送自己的数据留,前一个留只作为触发机制
// 第一参数为执行回调, 第二参数为初始值
from([1, 2, 3]).scan((a, b) => a+b, 0).subscriba(...)
// print
1 ---- 2 ---- 3
1+0 1+2 3+3
1 ---- 3 ---- 6
// 其他特殊操作
from([1, 2]).scan((a, b) => [...a, b], []);
// print
[1] --- [1, 2]
// 使用数组记录每次发送的值
const num$ = of(1);
num$.repeat(2).subscribe(num => console.log(num) );
// print
1 ---- 1
// 需要赋初始值,否则结果为NaN, (undefined + number)
form([1, 2]).margeScan( (a, b) => of( a + b), 0 ).subscribe(...)
// print
1 --- 3 ---- 6
from([1, 2, 3]).debounceTime(1000).subscribe(...)
// print
3
empty().defultIfEmpty(null).subscribe(...);
// print
null
of(1).delay(1000).subscribe(...)
console.log('after');
// print
'after'
1
of(1).delayWhen( data => interval(1000) ).subscribe(...)
// print
1000ms
------- 1
form([1, 2, 3]).do({
next: num => console.log(num),
error: error => console.log(error),
complete: () => console.log('complete')
}).subscribe(...)
// print
11 ---- 12 ---- 13
interval(500).elementAt(2).subscribe(...);
// print
2
// 过滤偶数
interval(1000).filter( num => num%2 === 0 ).subscribe(...);
// print
0 ---- 2 ---- 4 ....
interval(1000).find(num => num === 2).subscribe(...);
// print
2
from([1, 11, 13]).findIndex( num => num > 10 ).subscribe(...);
// print
1
interval(100).first().subscribe(...)
// print
0
from([1, 2, 3]).last().subscribe(...)
// print
3
range(0, 3).every(num < 3).subscribe(...);
// print
true
// 完成时,返回最终值
empty().isEmpty().subscribe(...);
// print
true
// 通过自定义函数做判断
from(['coco', 'py', 'nobody']).max((a, b) => a.length > b.length ? 1 : -1 ).subscribe(...);
// print
'nobody'
interval(1000).take(2).subscribe(...)
// print
0 ---- 1
range(0, 10).takelast(2).subscribe(...);
// print
9 ---- 10
interval(500).takeUnitl( of('down').delay(1000) ).subscrivbe(...)
// print
0
interval(100).takeWhile( num => num < 3 ).subscribe(...)
// print
0 --- 1 -- 2
interval(1000).switchMap(pre => interval(300)).subscribe(...);
// print
0 -------------- 1 -----------
0 --- 1 --- 2 --- 0 --- 1 --- 2
// 需要注意的是当上游发送频率大于下游时,下游将无法正常发送数据.
const a$ = range(0, 3)
const b$ = range(10, 3)
a$.contact(b$).subscribe(...);
// print
0 --- 1 --- 2 --- 10 --- 11 --- 12
range(0, 3)
.do(num => console.log(num)
.map(num => of('next'))
.concatAll()
.subscribe(...)
// print
0 --- next --- 1 --- next --- 2 --- next
/*
** 这里将每个上游值转化为Obervable, 当上游执行完
** 将调用下游值,将数据合并到同一流中
*/
const first$ = interva(500).mapTo('first');
const secend$ = interva(500).mapTo('secend');
first$.merge(secend$).subscribe(...)
// print
'first' --- 'secend' --- 'first' --- 'secend'
const a$ = range(0, 4);
const b$ = from(['coco', 'jeck', 'mike']);
const c$ = of(true, true, false);
Observable.zip( a$, b$, c$ ).subscribe(...);
// print
[ 0, 'coco', true ] ---- [ 1, 'jeck', true ] ---- [ 2, 'mike', false ]
/*
** 注意;只有当所有子流同次,都有数据发送时,才能获取最终数据
** 上面例子中 a$ 将多发送一次数据,当最终不会被输出
*/