Publish是Subject的一个基础子类。发送订阅后的数据流。
PublishSubject<Integer> publishSubject = PublishSubject.create();
BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(0);
我们创建了一个能发射整形(Integer)的BehaviorSubject。由于每当Observes订阅它时就会发射最新的数据,所以它需要一个初始值。
ReplaySubject会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发:
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
@Test
public void observerTest() {
PublishSubject<Integer> publishSubject = PublishSubject.create();
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(0);
ReplaySubject<Integer> replaySubject = ReplaySubject.create();
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
List<Subject> subjects = new ArrayList<>();
subjects.add(publishSubject);
subjects.add(behaviorSubject);
subjects.add(replaySubject);
subjects.add(asyncSubject);
for (Subject subject :
subjects) {
System.out.println("--------------------------------\n" + subject.getClass().getSimpleName() + " start");
subject.onNext(-3);
subject.onNext(-2);
subject.onNext(-1);
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError(");
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
});
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onCompleted();
System.out.println(subject.getClass().getSimpleName() + " end\n================================\n");
}
}
--------------------------------
PublishSubject start
1
2
3
onCompleted
PublishSubject end
================================
--------------------------------
BehaviorSubject start
-1 # 如果前面没有发送数据, 此处会显示 0
1
2
3
onCompleted
BehaviorSubject end
================================
--------------------------------
ReplaySubject start
-3
-2
-1
1
2
3
onCompleted
ReplaySubject end
================================
--------------------------------
AsyncSubject start
3
onCompleted
AsyncSubject end
================================