返回多个值
//返回了多个值,是同步
fun simpleSequence():Sequence<Int> = sequence {
for (i in 1..3){
Thread.sleep(1000)//阻塞
//因为有RestrictsSuspension注解,无法调用挂起函数,所以delay会报错
//delay(1000)
yield(i)
}
}
fun simpleFlow() = flow<Int>{
println("flow started")
for (i in 1..3){
delay(1000)
emit(i)
}
}
@Test
fun `test flow`() = runBlocking {
val flow = simpleFlow()
println("calling collect...")
flow.collect {value -> println(value)}
println("calling collect again...")
flow.collect {value -> println(value)}
}
//会输出string 2;string 4
@Test
fun `test flow2`() = runBlocking {
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string %it"
}.collect {
println("collect $it")
}
}
fun simpleFlow() = flow<Int>{
println("flow started")
for (i in 1..3){
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.IO)
使用launchIn替换collect,我们可以在单独的协程中启动流的收集
fun events() = (1..3)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`() = runBlocking {
val job = events()
.onEach { event -> println("event:$event") }
.launchIn(CoroutineScope(Dispatchers.IO))
delay(200)
job.cancelAndJoin()
}
launchIn需要传入协程作用域,返回的是Job,这样可以方便的取消停止流
流采用与协程同样的协作取消。流的收集可以是当流在一个可取消的挂起函数(例如delay)中挂起的时候取消
fun simpleFlow3() = flow<Int> {
for (i in 1..3) {
delay(1000)
println(1000)
emit(i)
}
}
@Test
fun `test cancel flow`() = runBlocking {
withTimeoutOrNull(2500){
simpleFlow3().collect { value -> println(value) }
}
println("done")
}
fun simpleFlow3() = flow<Int> {
for (i in 1..5) {
delay(1000)
println(1000)
emit(i)
}
}
//只会输出到3
@Test
fun `test cancel flow3`() = runBlocking {
simpleFlow3().collect { value ->
println(value)
if (value == 3){
cancel()
}
}
//如果不加cancellable()就不能取消
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3){
cancel()
}
}
}
背压:生产者效率大于消费者效率
@Test
fun `test flow back pressure`() = runBlocking {
val time = measureTimeMillis {
simpleFlow3()
.buffer(50)
//.conflate()
//.flowOn(Dispatchers.Default)
.collect { value ->
delay(300)
println("collected $value")
}
}
}
末端操作符是在流上用于启动流收集的挂起函数,collect是最基本的末端操作符
@Test
fun `test flow operator`() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it }
.reduce{a,b -> a+b}
//输出55
println(sum)
}
@Test
fun `test flow zip`() = runBlocking<Unit> {
val nums = (1..3).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs){ a,b -> "$a -> $b"}.collect { print(it) }
//会输出1->one,2->two,3->three
}
当运算符中的发射器或代码抛出异常时,处理方法:
@Test
fun `test flow exception`() = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { e:Throwable ->
println("catch $e")
//补充元素
emit(10)
}.flowOn(Dispatchers.IO).collect{println(it)}
}
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作
@Test
fun `test flow exception onCompletion`() = runBlocking<Unit> {
flow {
emit(1)
throw ArithmeticException("Div 0")
}.onCompletion { exception ->
if (exception != null){
println("flow completed exception: $exception")
}}
.catch { e:Throwable ->
println("catch $e")
}.collect{println(it)}
}
码字不易,求转发,求点在看,求关注,感谢!