我已经使用RxJava2很长时间了。最近,我正在研究RxJava2的源代码,试图理解它,编写我自己的运算符实现。我发现有很多线程安全的代码。但我认为有些是多余的,这增加了学习的难度。
根据Reactive Streams specs 2.7,“订阅者必须确保其订阅上的所有调用都发生在同一线程中,或者提供各自的外部同步。”调用者不是有责任确保它的调用是线程安全的吗?
但是我发现"io.reactivex.internal.operators.flowable.FlowableLimit$LimitSubscriber.request“这个方法使用AtomicLong来实现原子性。许多其他代码也是这样做的。为什么?
发布于 2018-07-27 09:27:12
但是我认为有些是多余的,这增加了学习的难度。
在实现运算符时,必须考虑来自不同来源/消费者的各种信号类型的许多可能的并发交互。我花了很多时间研究和优化RxJava 2。并发性是很难的,我非常确定实际上没有“多余”的实现细节。总有一个原因,为什么有些算法看起来是这样的。然而,可能有一些优化的可能性,但这些都必须仔细分析和思考。
根据Reactive Streams规范2.7的
在那里,have been和are currently对规则的措辞进行了一些讨论和改进,给出了RxJava和项目反应堆等实际实现的结果。
不是调用者的责任来确保它的调用是线程安全的吗?
在调用Subscriber.onXXX
方法时,特定运算符的Publisher
通常处于确保线程安全的最佳位置。否则,Subscriber
将需要更多地了解其源是什么或做什么。
但是我找到了"io.reactivex.internal.operators.flowable.FlowableLimit$LimitSubscriber.request“
有许多方法可以确保线程安全。“同步”一词并不一定意味着必须使用Java关键字synchronized
。RxJava主要使用所谓的无锁算法,这些算法利用java.util.concurrent.atomic
类提供的原子操作。请求管理通常是无锁原子的“客户端”。
发布于 2018-07-27 09:17:28
来自你正在使用的API的订阅对象" RXJava2“本身并不是线程安全的,但是如果你使用的是RXJava2,你必须使它成为线程安全的。或者只使用单个线程。
"A Subscriber MUST ensure that all calls on its Subscription are thread Safe"
这基本上是一个警告,说多线程可能会出现奇怪的情况,当多个线程遇到用"Subscribtion“类编写的代码时,你应该认真考虑一下后果。
但我不能回答的问题,因为可能是一些人开始更改代码,并试图使其更线程安全(位)代码演变和更改,它在文档中警告关于哪些部分还不是线程安全。
希望这能有所帮助。
https://stackoverflow.com/questions/51554475
复制