在RxJS中,Observable<T>
是一个基础类,用于表示可观察的数据流。要从Observable<T>
继承并创建自定义的Observable类,通常需要遵循以下步骤:
Observable
类,可以创建自定义的Observable,添加特定逻辑或行为。extends
关键字继承Observable<T>
。super()
并传入订阅逻辑(subscriber
函数)。import { Observable, Subscriber } from 'rxjs';
class CustomObservable<T> extends Observable<T> {
constructor(private sourceData: T[], private delayMs: number) {
super((subscriber: Subscriber<T>) => {
let index = 0;
const emitNextValue = () => {
if (index < this.sourceData.length) {
subscriber.next(this.sourceData[index++]);
setTimeout(emitNextValue, this.delayMs);
} else {
subscriber.complete();
}
};
emitNextValue();
});
}
// 自定义方法示例
public transform<U>(mapper: (value: T) => U): CustomObservable<U> {
return new CustomObservable<U>(
this.sourceData.map(mapper),
this.delayMs
);
}
}
// 使用示例
const data = [1, 2, 3];
const customObs = new CustomObservable<number>(data, 1000);
customObs.subscribe({
next: (value) => console.log(`Received: ${value}`),
complete: () => console.log('Completed'),
});
// 使用自定义方法
const transformed = customObs.transform(x => x * 2);
transformed.subscribe(value => console.log(`Transformed: ${value}`));
transform
)或覆盖默认行为。<T>
保持类型一致性。super()
中定义订阅时的行为(即subscriber
函数)。unsubscribe
时清理(可通过返回TeardownLogic
实现)。map
、filter
)。问题:为什么订阅后没有数据发射?
subscriber
函数中调用next()
或逻辑错误(如条件判断失误)。next
/complete
,使用调试工具(如tap
)跟踪流。问题:如何取消订阅?
Subscription
对象的unsubscribe()
方法:Subscription
对象的unsubscribe()
方法:通过继承Observable<T>
,可以灵活地创建符合特定需求的响应式数据流。
没有搜到相关的文章