“最近马斯克收购了推特之后,马上就裁掉了 50% 的推特员工,这不禁让我想起了灭霸的响指... 还有苹果、亚马逊冻结招聘,英特尔、Lyft开启裁员计划,国内外都不好过啊,大家都开始勒紧裤腰带了···那么,我们打工人是不是也该刷刷题了···(笑Cry.jpg)
Kotlin 学习笔记艰难地来到了第五篇~ 在这一篇主要会说 Flow 的基本知识和实例。由于 Flow 内容较多,所以会分几个小节来讲解,这是第一小节,文章后面会结合一个实例介绍 Flow 在实际开发中的应用。
首先回想一下,在协程中处理某个操作,我们只能返回单个结果;而 Flow 可以按顺序返回多个结果,在官方文档中,Flow 被翻译为 数据流
,这也说明了 Flow 适用于多值返回的场景。
Flow 是以协程为基础构建的,所以它可通过异步的方式处理一组数据,所要处理的数据类型必须相同,比如:Flow<Int>
是处理整型数据的数据流。
Flow 一般包含三个部分: 1)提供方:负责生成数据并添加到 Flow 中,得益于协程,Flow 可以异步生成数据; 2)中介(可选):可对 Flow 中的值进行操作、修改;也可修改 Flow 本身的一些属性,如所在线程等; 3)使用方:接收并使用 Flow 中的值。 提供方:生产者,使用方:消费者,典型的生产者消费者模式。
Flow 是一个异步数据流,它可以顺序地发出数据,通过流上的一些中间操作得出结果;若出错可抛出异常。这些 “流上的中间操作” 包括但不限于 map
、filter
、take
、zip
等等方法。这些中间操作是链式的,可以在后面再次添加其他操作方法,并且也不是挂起函数,它们只是构建了一条链式的操作并实时返回结果给后面的操作步骤。
流上的终端操作符要么是挂起函数,例如 collect
、single
、toList
等等,要么是在给定作用域内开始收集流的 launchIn
操作符。前半句好理解,后半句啥意思?这就得看一下 launchIn
这个终端操作符的作用了。它里面是这样的:
//code 1
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
原来 launchIn
方法可以传入一个 CoroutineScope
协程作用域,然后在这个作用域里面调用 collect
方法。lifecycleScope
、MainScope()
这些都是协程作用域,所以 launchIn
方法只不过是 scope.launch { flow.collect() }
的一种简写。
流的执行也被称之为收集流,并且是以挂起的方式,不是阻塞的。流最终的执行成功与否取决于流上的操作是否全部执行成功。collect
函数就是最常见的收集流函数。
冷流(Cold Flow):在数据被使用方订阅后,即调用 collect
方法之后,提供方才开始执行发送数据流的代码,通常是调用 emit
方法。即不消费,不生产,多次消费才会多次生产。使用方和提供方是一对一的关系。
热流(Hot Flow):无论有无使用方,提供方都可以执行发送数据流的操作,提供方和使用方是一对多的关系。热流就是不管有无消费,都可生产。
SharedFlow
就是热流的一种,任何流也可以通过 stateIn
和 shareIn
操作转化为热流,或者通过 produceIn
操作将流转化为一个热通道也能达到目的。本篇只介绍冷流相关知识,热流会在后面小节讲解~
Flow 的构造方法有如下几种:
1、 flowOf()
方法。用于快速创建流,类似于 listOf()
方法,下面是它的源码:
//code 2
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
所以用法也比较简单:
//code 3
val testFlow = flowOf(65,66,67)
lifecycleScope.launch {
testFlow.collect {
println("输出:$it")
}
}
//打印结果:
//输出:65
//输出:66
//输出:67
注意到 Flow 初始化的时候跟其他对象一样,作用域在哪儿都可以,但 collect
收集的时候就需要放在协程里了,因为 collect
是个挂起函数。
2、asFlow()
方法。是集合的扩展方法,可将其他数据转换成 Flow,例如 Array
的扩展方法:
//code 4
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
不仅 Array
扩展了此方法,各种其他数据类型的数组都扩展了此方法。所以集合可以很方便地构造一个 Flow。
3、flow {···}
方法。这个方法可以在其内部顺序调用 emit
方法或 emitAll
方法从而构造一个顺序执行的 Flow。emit
是发射单个值;emitAll
是发射一个流,这两个方法分别类似于 list.add(item)
、list.addAll(list2)
方法。flow {···}
方法的源码如下:
//code 5
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
需要额外注意的是,flow
后面的 lambda 表达式是一个挂起函数,里面不能使用不同的 CoroutineContext
来调用 emit
方法去发射值。因此,在 flow{...}
中不要通过创建新协程或使用 withContext
代码块在另外的 CoroutineContext
中调用 emit
方法,否则会报错。如果确实有这种需求,可以使用 channelFlow
操作符。
//code 6
val testFlow = flow {
emit(23)
// withContext(Dispatchers.Main) { // error
// emit(24)
// }
delay(3000)
emitAll(flowOf(25,26))
}
4、channelFlow {···}
方法。这个方法就可以在内部使用不同的 CoroutineContext
来调用 send
方法去发射值,而且这种构造方法保证了线程安全也保证了上下文的一致性,源码如下:
//code 7
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)
一个简单的使用例子:
//code 8
val testFlow1 = channelFlow {
send(20)
withContext(Dispatchers.IO) { //可切换线程
send(22)
}
}
lifecycleScope.launch {
testFlow1.collect {
println("输出 = $it")
}
}
5、MutableStateFlow
和 MutableSharedFlow
方法:都可以定义相应的构造函数去创建一个可以直接更新的热流。由于篇幅有限,有关热流的知识后面小节会再说明。
Flow 的使用依赖于众多的操作符,这些操作符可以大致地分为 中间操作符 与 末端操作符 两大类。中间操作符是流上的中间操作,可以针对流上的数据做一些修改,是链式调用。中间操作符与末端操作符的区别是:中间操作符是用来执行一些操作,不会立即执行,返回值还是个 Flow;末端操作符就会触发流的执行,返回值不是 Flow。
一个完整的 Flow 是由 Flow 构建器
、Flow 中间操作符
、Flow 末端操作符
组成,如下示意图所示:
最常见的当然是 collect
操作符。它是个挂起函数,需要在协程作用域中调用;并且它是一个末端操作符,末端操作符就是实际启动 Flow 执行的操作符,这一点跟 RxJava 中的 Observable
对象的执行很像。
熟悉 RxJava 的同学知道,在 RxJava 中,Observable
对象的执行开始时机是在被一个订阅者(subscriber
) 订阅(subscribe
) 的时候,即在 subscribe
方法调用之前,Observable
对象的主体是不会执行的。
Flow 也是相同的工作原理,Flow 在调用 collect
操作符收集流之前,Flow 构建器和中间操作符都不会执行。举个栗子:
//code 9
val testFlow2 = flow {
println("++++ 开始")
emit(40)
println("++++ 发出了40")
emit(50)
println("++++ 发出了50")
}
lifecycleScope.launch {
testFlow2.collect{
println("++++ 收集 = $it")
}
}
// 输出结果:
//com.example.myapplication I/System.out: ++++ 开始
//com.example.myapplication I/System.out: ++++ 收集 = 40
//com.example.myapplication I/System.out: ++++ 发出了40
//com.example.myapplication I/System.out: ++++ 收集 = 50
//com.example.myapplication I/System.out: ++++ 发出了50
从输出结果可以看出,每次到 collect
方法调用时,才会去执行 emit
方法,而在此之前,emit
方法是不会被调用的。这种 Flow 就是冷流。
reduce
也是一个末端操作符,它的作用就是将 Flow 中的数据两两组合接连进行处理,跟 Kotlin 集合中的 reduce
操作符作用相同。举个栗子:
//code 10
private fun reduceOperator() {
val testFlow = listOf("w","i","f","i").asFlow()
CoroutineScope(Dispatchers.Default).launch {
val result = testFlow.reduce { accumulator, value ->
println("+++accumulator = $accumulator value = $value")
"$accumulator$value"
}
println("+++final result = $result")
}
}
//输出结果:
//com.example.myapplication I/System.out: +++accumulator = w value = i
//com.example.myapplication I/System.out: +++accumulator = wi value = f
//com.example.myapplication I/System.out: +++accumulator = wif value = i
//com.example.myapplication I/System.out: +++final result = wifi
看结果就知道,reduce
操作符的处理逻辑了,两个值处理后得到的新值作为下一轮中的输入值之一,这就是两两接连进行处理的意思。
图1 中出现的 toList
操作符也是一种末端操作符,可以将 Flow 返回的多个值放进一个 List
中返回,返回的 List
也可以自己设置,比较简单,感兴趣的同学可自行动手试验。
zip
顾名思义,就是可以将两个 Flow 汇合成一个 Flow,举个栗子就知道了:
//code 11
lateinit var testFlow1: Flow<String>
lateinit var testFlow2: Flow<String>
private fun setupTwoFlow() {
testFlow1 = flowOf("Red", "Blue", "Green")
testFlow2 = flowOf("fish", "sky", "tree", "ball")
CoroutineScope(Dispatchers.IO).launch {
testFlow1.zip(testFlow2) { firstWord, secondWord ->
"$firstWord $secondWord"
}.collect {
println("+++ $it +++")
}
}
}
// 输出结果:
//com.example.myapplication I/System.out: +++ Red fish +++
//com.example.myapplication I/System.out: +++ Blue sky +++
//com.example.myapplication I/System.out: +++ Green tree +++
//zip 方法声明:
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
从 zip
方法的声明中可知,zip
方法的第二个参数就是针对两个 Flow 进行各种处理的挂起函数,也可如例子中写成尾调函数的样子,返回值是处理之后的 Flow。而且当两个 Flow 长度不一样时,最后的结果会默认剔除掉先前较长的 Flow 中的元素。所以 testFlow2
中的 “ball” 就被自动剔除掉了。
正如 RxJava 框架中的 subscribe
方法可以通过传入 Observer
对象在其 onNext
、onComplete
、onError
返回之前处理的结果,Flow 也有诸如 catch
、onCompletion
等操作符去处理执行的结果。例如下面的代码:
//code 12
private fun handleExceptionDemo() {
val testFlow = (1..5).asFlow()
CoroutineScope(Dispatchers.Default).launch {
testFlow.map {
check(it != 3) {
//it == 3 时,会走到这里
println("+++ catch value = $it")
}
println("+++ not catch value = $it")
it * it
}.onCompletion {
println("+++ onCompletion value = $it")
}.catch { exception ->
println("+++ catch exception = $exception")
}.collect{
println("+++ collect value = $it")
}
}
}
//输出结果:
//com.example.myapplication I/System.out: +++ not catch value = 1
//com.example.myapplication I/System.out: +++ collect value = 1
//com.example.myapplication I/System.out: +++ not catch value = 2
//com.example.myapplication I/System.out: +++ collect value = 4
//com.example.myapplication I/System.out: +++ catch value = 3
//com.example.myapplication I/System.out: +++ onCompletion value = java.lang.IllegalStateException: kotlin.Unit
//com.example.myapplication I/System.out: +++ catch exception = java.lang.IllegalStateException: kotlin.Unit
顺着代码咱先来看看一些常用的 Flow 中间操作符。
1)map
:用来将 Flow 中的数据一个个拿出来做各自的处理,然后交给下一个操作符;本例中就是将 Flow 中的数据进行平方处理;
2)check()
:类似于一个检查站,满足括号内条件的数据可以通过,不满足则交给它的尾调函数处理,并且抛出异常;
3)onCompletion
:Flow 最后的兜底器。无论 Flow 最后是执行完成、被取消、抛出异常,都会走到 onCompletion
操作符中,类似于在 Flow 的 collect
函数外加了个 try
,finally
。官方给了个小栗子,还是很清楚的:
//code 13
try {
myFlow.collect { value ->
println(value)
}
} finally {
println("Done")
}
//上述代码可以替换为下面的代码:
myFlow
.onEach { println(it) }
.onCompletion { println("Done") }
.collect()
所以,在 code 12 中的 onCompletion
操作符可以接住从 check
那儿抛出的异常;
4)catch
:不用多说,专门用于捕捉异常的,避免程序崩溃。这里如果把 catch
去掉,程序就会崩溃。如果把 catch
和 onCompletion
操作符位置调换,则 onCompletion
里面就接收不到异常信息了,如图所示。
说了这么多,举个在实际中经常用到的数据请求的例子吧。先来看一个最简单的例子:
现在一般都是在 ViewModel 里持有 LiveData 数据,并且进行数据的请求,所以先来看下 ViewModel 中的代码实现:
//code 14
class SingleNetworkCallViewModel: ViewModel() {
private val users = MutableLiveData<Resource<List<ApiUser>>>()
private val apiHelperImpl = ApiHelperImpl(RetrofitBuilder.apiService)
fun fetchUsers() {
viewModelScope.launch {
users.postValue(Resource.loading(null))
apiHelperImpl.getUsers()
.catch { e ->
users.postValue(Resource.error(e.toString(), null))
}
.collect {
users.postValue(Resource.success(it))
}
}
}
fun getUsersData(): LiveData<Resource<List<ApiUser>>> {
return users
}
}
从代码可看出,fetchUsers
方法就是数据请求方法,里面的核心方法是 ApiHelperImpl
类对象的 getUsers
方法,在之前初始化 apiHelperImpl
对象时传入了一个 RetrofitBuilder.apiService
值,所以底层还是用到了 Retrofit 框架进行的网络请求。Retrofit 相关的代码如下:
//code 15
object RetrofitBuilder {
private const val BASE_URL = "https://xxxxxxx/"
private fun getRetrofit(): Retrofit {
return Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.build()
}
val apiService: ApiService = getRetrofit().create(ApiService::class.java)
}
//ApiService 中的代码也是一般常见的代码:
interface ApiService {
@GET("users")
suspend fun getUsers(): List<ApiUser>
}
再回过来看看 ViewModel 的代码,从 apiHelperImpl.getUsers
方法后面的 catch
和 collect
操作符也可看出,getUsers
方法返回的就是一个 Flow 对象,其使用的构造方法就是前文中说到的 flow{}
方法:
//code 16
class ApiHelperImpl(private val apiService: ApiService) : ApiHelper {
override fun getUsers(): Flow<List<ApiUser>> {
return flow { emit(apiService.getUsers()) }
}
}
ApiHelper
其实就是一个接口,规定了 ApiHelperImpl
中数据请求的方法名及返回值,返回值是一个 Flow,里面是我们最终需要的数据列表:
//code 17
interface ApiHelper {
fun getUsers(): Flow<List<ApiUser>>
}
Flow 调用 emit
发出去的就是 Retrofit 进行数据请求后返回的 List<ApiUser>
数据。
如何在 Activity 中使用就是之前使用 LiveData 的常规操作了:
//code 18
private fun setupObserver() {
viewModel.getUsersData().observe(this, Observer {
when (it.status) {
Status.SUCCESS -> {
progressBar.visibility = View.GONE
it.data?.let { users -> renderList(users) }
recyclerView.visibility = View.VISIBLE
}
Status.LOADING -> {
progressBar.visibility = View.VISIBLE
recyclerView.visibility = View.GONE
}
Status.ERROR -> {
//Handle Error
progressBar.visibility = View.GONE
Toast.makeText(this, it.message, Toast.LENGTH_SHORT).show()
}
}
})
}
上述例子是最简单的单个数据接口请求的场景,如果是两个或是多个数据接口需要并行请求,该如何处理呢?这就需要用到之前说的 Flow 中的 zip
操作符了。接着上面的例子,再添加一个数据请求方法 getMoreUsers
,那么两个接口并行的例子为:
//code 18
fun fetchUsers() {
viewModelScope.launch {
users.postValue(Resource.loading(null))
apiHelper.getUsers()
.zip(apiHelper.getMoreUsers()) { usersFromApi, moreUsersFromApi ->
val allUsersFromApi = mutableListOf<ApiUser>()
allUsersFromApi.addAll(usersFromApi)
allUsersFromApi.addAll(moreUsersFromApi)
return@zip allUsersFromApi
}
.flowOn(Dispatchers.Default)
.catch { e ->
users.postValue(Resource.error(e.toString(), null))
}
.collect {
users.postValue(Resource.success(it))
}
}
}
两个数据接口请求的快慢肯定不一样,但不用担心,zip
操作符会等待两个接口的数据都返回之后才进行拼接并交给后面的操作符处理,所以这里还需要调用 flowOn
操作符将线程切换到后台线程中去挂起等待。但后面的 collect
操作符执行的代码是在主线程中,感兴趣的同学可以打印线程信息看看,这就需要了解一下 flowOn
操作符的用法了。
flowOn
方法可以切换 Flow 处理数据的所在线程,类似于 RxJava 中的 subscribeOn
方法。例如 flowOn(Dispatchers.Default)
就是将 Flow 的操作都放到后台线程中执行。
当 flowOn
操作符之前没有设置任何的协程上下文,那么 flowOn
操作符可以为它之前的操作符设置执行所在的线程,并不会影响它之后下游的执行所在线程。下面是一个简单例子:
//code 19
private fun flowOnDemo() {
val testFlow = (1..2).asFlow()
MainScope().launch {
testFlow
.filter {
println("1+++ $it ${Thread.currentThread().name}")
it != 3
}.flowOn(Dispatchers.IO)
.map {
println("2+++ $it ${Thread.currentThread().name}")
it*it
}.flowOn(Dispatchers.Main)
.filter {
println("3+++ $it ${Thread.currentThread().name}")
it!=25
}.flowOn(Dispatchers.IO)
.collect{
println("4+++ $it ${Thread.currentThread().name}")
}
}
}
//输出结果:
//com.example.myapplication I/System.out: 1+++ 1 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 1+++ 2 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 2+++ 1 main
//com.example.myapplication I/System.out: 2+++ 2 main
//com.example.myapplication I/System.out: 3+++ 1 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 3+++ 4 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 4+++ 1 main
//com.example.myapplication I/System.out: 4+++ 4 main
发现了么?flowOn
操作符只对最近的上游操作符线程负责,它下游的线程会自动切换到之前所在的线程。如果连续有两个或多个 flowOn
操作符切换线程,则会切换到首个 flowOn
操作符切换的线程中去:
//code 20
testFlow
.filter {
println("1+++ $it ${Thread.currentThread().name}")
it != 3 //最终会在 Main 主线程中执行
}.flowOn(Dispatchers.Main).flowOn(Dispatchers.IO).flowOn(Dispatchers.Default)
.collect{
println("4+++ $it ${Thread.currentThread().name}")
}
在 filter
后面连续有两个 flowOn
操作符,但最终会在 Main 线程中执行 filter
操作符中的逻辑。
整体上看,Flow 在数据请求时所扮演的角色是数据接收与处理后发送给 UI 层的作用,这跟 RxJava 的职责是相同的,而且两者都有丰富的操作符来处理各种不同的情况。不同的是 Flow 是将接收到的数据放到 Flow 载体中,而 RxJava 一般将数据放到 Observable
对象中;Flow 处理数据更加方便和自然,去除了 RxJava 中繁多且功能臃肿的操作符。
最后总结一下 Flow 第一小节的内容吧:
1)Flow 数据流可异步按顺序返回多个数据;
2)Flow 整体是由 构建器、中间操作符、末端操作符 组成;
3)冷流只有在调用末端操作符时,流的构造器和中间操作符才会开始执行;冷流的使用方和提供方是一对一的;
4)简单介绍了 collect
、reduce
末端操作符以及 zip
、map
等中间操作符的使用;
5)Flow 异常处理所用到的 catch
、check
、onCompletion
等操作符的用法;
6)Flow 在数据请求上的实例
所用实例来源:https://github.com/MindorksOpenSource/Kotlin-Flow-Android-Examples