响应式编程
非阻塞的异步编程,映射在代码中本质就是回调函数,与响应式编程模型对应的是传统的 指令式编程
;
以Java 9的api为例:
发布者, 数据输入的对象, T表示数据的类型
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
中间代理, 发布者和订阅者并没有直接的联系,而是将数据的传递控制 从数据和数据的变化里分离出来,进而降低功能之间的耦合
public static interface Subscription {
public void request(long n);
public void cancel();
}
订阅者,T表述数据的类型,分别规定了四种情形下的反应:
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
处理器,同时实现了Publisher和Subscriber,也就是做一个承上启下的作用,是流在执行过程中数据处理的中间流程;
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
使用Flow的api 实现: a=b+c
a: 假设需要知道一件事情结束的时间是周几
b:数据开始的时间是周几 周五周六周日不好好干活,推到周一 ,实际开始时间就是周一,
c:处理完这件事情的时间需要几天 , 周一 需要两天, 周二周三周四需要一天,
NOTE | 不使用多线程,仅仅想对响应式的代码执行情况做一些了解 |
---|
bc不拆分,快速实现业务
public class TimePublisher implements Flow.Publisher<LocalDate> {
public final LinkedList<LocalDate> items;
public TimePublisher(LinkedList<LocalDate> items) {
this.items = items;
}
@Override
public void subscribe(Flow.Subscriber subscriber) {
subscriber.onSubscribe(new TimeSubscription(subscriber, items));
}
}
public class TimeSubscription implements Flow.Subscription {
private final Flow.Subscriber<LocalDate> subscriber;
private LinkedList<LocalDate> list;
public TimeSubscription(Flow.Subscriber<LocalDate> subscriber, LinkedList<LocalDate> list) {
this.subscriber = subscriber;
this.list = list;
}
@Override
public void request(long n) {
if (list.isEmpty()) {
subscriber.onComplete();
return;
}
for (long i = 0; i < n; i++) {
LocalDate item = list.remove();
subscriber.onNext(item);
}
}
@Override
public void cancel() {
//标记为删除; 关闭资源等
}
}
public class TimeSubscriber implements Flow.Subscriber<LocalDate> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(LocalDate item) {
DayOfWeek dayOfWeek = item.getDayOfWeek();
int days = startAfterDay(dayOfWeek);
LocalDate localDate = item.plusDays(days);
System.out.println("完成日期为: " + localDate);
subscription.request(1);
}
private int startAfterDay(DayOfWeek dayOfWeek) {
return switch (dayOfWeek) {
case MONDAY -> 2;
case TUESDAY, THURSDAY, WEDNESDAY -> 1;
case FRIDAY -> 5;
case SATURDAY -> 4;
case SUNDAY -> 3;
};
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
public static void main(String[] args) {
//第一步, 声明一个发布者
LinkedList<LocalDate> linkedList = new LinkedList();
linkedList.add(LocalDate.now());
TimePublisher timePublisher = new TimePublisher(linkedList);
//第二部,声明过程,
TimeSubscriber timeSubscriber = new TimeSubscriber();
timePublisher.subscribe(timeSubscriber);
}
将执行过程拆解为两个过程执行,使用Processor,
当天是周几是流的中间结果,即是上游的结果又是下游的数据来源; 尝试下代码的编写
这里直接将第一阶段的代码进行拆分,因此Publisher和Subscription不变,而subscriber则变成了Processor,成为了中间处理器
public class TimeProcessor implements Flow.Processor<LocalDate,DayOfWeek> {
private Flow.Subscription subscription;
public TimeProcessor() {
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(LocalDate item) {
DayOfWeek dayOfWeek = item.getDayOfWeek();
emit(dayOfWeek);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
// ------- 下一个流水线,
private void emit(DayOfWeek dayOfWeek) {
subscribe(daySubscriber);
daySubscription.emit(dayOfWeek);
}
DaySubscription daySubscription;
@Override
public void subscribe(Flow.Subscriber subscriber) {
if (daySubscription == null) {
return;
}
DaySubscription subscription = new DaySubscription((DaySubscriber) subscriber);
this.daySubscription = subscription;
daySubscriber.onSubscribe(subscription);
}
DaySubscriber daySubscriber;
public TimeProcessor downStream(DaySubscriber daySubscriber) {
this.daySubscriber = daySubscriber;
return this;
}
}
下游的中介
demo为了省事直接写到了发布者的构造器中; 但是中间状态这里增加一个发出数据的方法; 发射数据后中介会调用订阅者消费消息
public class DaySubscription implements Flow.Subscription {
private final DaySubscriber dayProcessor;
private DayOfWeek dayOfWeek;
public DaySubscription(DaySubscriber daySubscriber) {
this.dayProcessor = daySubscriber;
}
public void emit(DayOfWeek dayOfWeek) {
this.dayOfWeek = dayOfWeek;
request(1);
}
@Override
public void request(long n) {
dayProcessor.onNext(dayOfWeek);
}
@Override
public void cancel() {
}
}
下游订阅者
这个地方为了简单,并没有使用到背压; 因为作为demo,没有使用任何异步.所以onNext仅仅执行消费数据的逻辑
public class DaySubscriber<T> implements Flow.Subscriber<DayOfWeek> {
private Flow.Subscription daySubscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.daySubscription = subscription;
}
@Override
public void onNext(DayOfWeek item) {
int day = switch (item) {
case MONDAY -> 2;
case TUESDAY, THURSDAY, WEDNESDAY -> 1;
case FRIDAY -> 5;
case SATURDAY -> 4;
case SUNDAY -> 3;
};
System.out.printf("任务开始于: %s , 将在 %d 天后开始执行 ", item, day);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
考虑简单; 仅仅有一个中间处理器来熟悉响应式的基本逻辑
本质上需要指定步骤,代码类似于上面的流程图
public static void main(String[] args) {
LinkedList<LocalDate> linkedList = new LinkedList<>();
linkedList.add(LocalDate.now());
TimePublisher timePublisher = new TimePublisher(linkedList);
timePublisher.subscribe(new TimeProcessor().downStream(new DaySubscriber()));
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。