RxJava是使用Java实现的响应式编程库,RxJava即 Reactive Extensions Java。目前有两个版本RxJava1和RxJava2,推荐使用RxJava2,RxJava1已经停止支持了
ReactiveX即Reactive Extensions,它通过可观测的序列,实现了组合异步和事件驱动.它是一种编程方式。ReactiveX主要的实现方式是扩展 观察者模式 来达到自己的目的。ReactiveX更多介绍戳官网
- cold:每次订阅和其它任何订阅它的取到的数据都是一样的
- hot:只能获取从订阅那一刻开始的数据,后续订阅的不能获取之前已经产生的数据Observer的方法介绍
onNext : 每次想通知 Observer 数据变化的时候,Observer的onNext方法就会被调用
- PublishSubject获取订阅时候的数据
- BehaviourSubject可以获取订阅之前的1个数据
- ReplaySubject能获取订阅前已经产生的所有数据
- AsyncSubject只获取最后一个数据
User user = new User();
user.setAge(1);
Observable.just(user).subscribe(new Action1<User>() {
@Override
public void call(User user) {
LOG.info("justVerify just user age:{}", user.getAge());
}
});
复制代码
Observable.just(1).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LOG.info("accept:{}", integer);
}
});
复制代码
产生Observable的数据 可以与 产生Observable的数据 对比下区别,大致就可以转换出过来了。
背压:如果生产太快,消费不过来该怎么办?可以反过来限制生产的速度,比如把所有生产的数据都缓存起来,消费者自己来取,比如只有有请求的时候才给数据,没有请求的数据全部扔掉