响应式编程最重要的是解决生产者和消费者之间的关系。如果生产者产生的数据过大,而消费者消费不过来,就会压垮消费者。所以就需要有一个重要的概念——流控。
解决流控有几种方式
背压机制
如果生产者发出的数据比消费者能够处理数据的最大量还要多,消费者可能会被迫一直在获取和处理数据,消耗越来越多的资源,从而埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者降低数据的生成速度。生产者可以采用多种策略来实现这一要求,这就是背压。
背压机制应该以非阻塞的方式工作。实现非阻塞背压的方法是放弃推策略而采用拉策略。
响应式流
响应式流规范是提供非阻塞背压的异步流处理标准的一种倡议。
响应式流接口
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
发布者(Publisher)是潜在的包含无限数量的有序元素的生产者,它根据收到的请求向当前订阅者发送元素。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
订阅者(Subscriber)从发布者那里订阅并接收元素。发布者向订阅者发送订阅令牌(Subscription Token)。使用订阅令牌,订阅者向发布者请求多个元素。当元素准备就绪时,发布者就会向订阅者发送合适数量的元素。
当执行发布者的subscribe()方法时,发布者会回调订阅者的onSubscribe()方法。在这个方法中,订阅者通常会借助传入的Subscription对象向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext()方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete()方法告知订阅者流已经发完;如果有错误发生,则通过onError()方法发出错误数据,这同样也会终止数据流。
public interface Subscription {
public void request(long n);
public void cancel();
}
订阅(Subscription)表示订阅者订阅的一个发布者的令牌。当订阅请求成功时,发布者将其传递给订阅者。订阅者使用订阅令牌与发布者进行交互。例如,请求更多的元素或取消订阅。
当发布者调用subscribe()方法注册订阅者时,会通过订阅者的回调方法onSubscribe()传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request()方法向发布者请求数据。背压机制的实现正是基于这一点。