前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >了解Kotlin Flow(二)

了解Kotlin Flow(二)

作者头像
烧麦程
发布2022-05-10 20:51:02
7390
发布2022-05-10 20:51:02
举报
文章被收录于专栏:半行代码半行代码

上一篇我们初步认识了 StateFlowSharedFlow ,为了有个相对更加清晰的认识,我们来看下它们的具体实现。

StateFlow

先来看看我们常用的 StateFlow(MutableStateFlow)

MutableStateFlow 实际是一个包括了一个 value 的接口。我们使用的 MutableStateFlow 则是一个创建具体子类的方法:

代码语言:javascript
复制
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

StateFlow 的实现类是 StateFlowImpl 类。我们看下 setValue:

代码语言:javascript
复制
// StateFlowImpl setValue
public override var value: T
  get() = NULL.unbox(_state.value)
  set(value) { updateState(null, value ?: NULL) }


// StateFlowImple updateState
private fun updateState(expectedState: Any?, newState: Any): Boolean {
  var curSequence = 0
  var curSlots: Array<StateFlowSlot?>? = this.slots
  synchronized(this) {
    val oldState = _state.value
    if (expectedState != null && oldState != expectedState) return false
    if (oldState == newState) return true
    _state.value = newState
    curSequence = sequence
    if (curSequence and 1 == 0) {
      curSequence++
   sequence = curSequence
    } else {
      sequence = curSequence + 2
      return true
    }
    curSlots = slots
  }
  
  while (true) {
    curSlots?.forEach {
       it?.makePending()
    }
    
    synchronized(this) {
      if (sequence == curSequence) {
        sequence = curSequence + 1
        return true
      }
      curSequence = sequence
      curSlots = slots
    }
  }
}

这里可以看到

  • StateFlow 更新值是线程安全的。
  • 当新旧的 state 一样的时候,不会有额外处理。也就是大家常说的防抖。类似 LiveDatadistinctUntilChanged
  • StateFlowImpl 维护了一个 Slot 数组,每个collect对应了其中一个 Slot
  • 调用 makePending 来标记 Slot 的状态:
代码语言:javascript
复制
// StateFlowSlot makePending
fun makePending() {
  _state.loop { state ->
    when {
     state == null -> return
      state === PENDING -> return
      state === NONE -> {
        if (_state.compareAndSet(state, PENDING)) return
      }
     else -> {
       if (_state.compareAndSet(state, NONE)) {
          (state as CancellableContinuationImpl<Unit>).resume(Unit)
          return
        }
      }
   }
 }
}

当状态还是 Pending 的时候,会忽略。否则如果是在协程里运行,则会恢复协程。接着看 StateFlowImplcollect 方法:

代码语言:javascript
复制
// StateFlowImpl collect
override suspend fun collect(collector: FlowCollector<T>) {
  val slot = allocateSlot()
  try {
    val collectorJob = currentCoroutineContext()[Job]
    var oldState: Any? = null
    while (true) {
      val newState = _state.value
      collectorJob?.ensureActive()
      if (oldState == null || oldState != newState) {
        collector.emit(NULL.unbox(newState))
        oldState = newState
      }
      
      if (!slot.takePending()) {
        slot.awaitPending()
      }
    }
  } finally {
    freeSlot(slot)
  }
}

这里会有一个 while(true) 循环,使用我们在代码中写 collect 的时候应该写成这样:

代码语言:javascript
复制
scope.launch {
  state1.collect {}
}

scope.launch {
 state2.collect {}
}

千万不要写成

代码语言:javascript
复制
scope.launch {
 state1.collect {}
  state2.collect {}
}

这样 state2 的 collect 不会执行。

关于 collect:

  • 首先会分配一个 Slot
  • 接着当值更新后,会执行我们传入的逻辑
  • 当 solt 状态不是 Pending 的时候,执行 awaitPending 方法
代码语言:javascript
复制
//StateFlowSlot awaitPending
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
  if (_state.compareAndSet(NONE, cont)) return@sc
  cont.resume(Unit)
}

这里因为不在 Pending 状态了,所以也没有必要持续死循环了,所以这里会挂起协程。并且把协程的 Continuation 对象赋值给 _state。这样当发送新的值的时候,协程会被恢复执行。所以 StateFlowcollect 方法在没有值更新的时候也是会挂起协程不消耗系统资源的。

SharedFlow

接着看下 SharedFlow 的实现, SharedFlow 实现比 StateFlow 复杂一点。我们经常使用的 MutableSharedFlow 也是一个创建 Flow 对象的方法。

代码语言:javascript
复制
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
  val bufferCapacity0 = replay + extraBufferCapacity
 val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
 return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

这里 bufferCapacity 参数传的是 replayextraBufferCapacity 的和。SharedFlowImpl 内部维护了一个缓存队列。用来存储数据流的内容。总体结构在代码的注释中可以看到:

代码语言:javascript
复制
/*
        Logical structure of the buffer

                  buffered values
             /-----------------------\
                          replayCache      queued emitters
                          /----------\/----------------------\
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
         |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
         +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
               ^           ^           ^                      ^
               |           |           |                      |
              head         |      head + bufferSize     head + totalSize
               |           |           |
     index of the slowest  |    index of the fastest
      possible collector   |     possible collector
               |           |
               |     replayIndex == new collector's index
               \---------------------- /
          range of possible minCollectorIndex

          head == minOf(minCollectorIndex, replayIndex) // by definition
          totalSize == bufferSize + queueSize // by definition

       INVARIANTS:
          minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
          replayIndex <= head + bufferSize
     */

看下发射新数据的 emit 方法:

代码语言:javascript
复制
// SharedFlowImpl emit
override suspend fun emit(value: T) {
    if (tryEmit(value)) return 
    emitSuspend(value)
}

// tryEmit
override fun tryEmit(value: T): Boolean {
    var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
    val emitted = synchronized(this) {
        if (tryEmitLocked(value)) {
            resumes = findSlotsToResumeLocked(resumes)
            true
        } else {
            false
        }
    }
    for (cont in resumes) cont?.resume(Unit)
    return emitted
}

tryEmit 方法是线程安全的,里面会调用 tryEmitLocked:

代码语言:javascript
复制
// SharedFlowImpl tryEmitLocked
private fun tryEmitLocked(value: T): Boolean {
   if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
   if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
     when (onBufferOverflow) {
       BufferOverflow.SUSPEND -> return false
       BufferOverflow.DROP_LATEST -> return true
       BufferOverflow.DROP_OLDEST -> {}
     }
   }
   enqueueLocked(value)
   bufferSize++
   if (bufferSize > bufferCapacity) dropOldestLocked()
   
   if (replaySize > replay) {
     updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
   }
   return true
}

当发生背压,处理没有发射快的时候(缓存size大于等于缓存容量并且最小的收集器下标小于等于replay的下标),会根据 onBufferOverflow 的值选择不同的处理。

  • suspend 返回false,继续执行 emitSuspend,内部也会调用到 enqueueLocked
  • drop latest 返回true,不处理
  • drop oldest 继续处理,调整缓存队列,并且调用 enqueueLocked 把新值加入队列。
代码语言:javascript
复制
// SharedFlowImple enqueueLocked
private fun enqueueLocked(item: Any?) {
  val curSize = totalSize
  val buffer = when (val curBuffer = buffer) {
    null -> growBuffer(null, 0, 2) 
    else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
  }
  buffer.setBufferAt(head + curSize, item)
}

这里会把值加到 buffer 里面。如果 buffer 队列容量不足,会进行 2 倍的扩容。接着会调用 findSlotsToResumeLocked去恢复。

接着看下 collect 的实现:

代码语言:javascript
复制
// SharedFlowImpl collect
override suspend fun collect(collector: FlowCollector<T>) {
    val slot = allocateSlot()
    try {
        if (collector is SubscribedFlowCollector) collector.onSubscription()
        val collectorJob = currentCoroutineContext()[Job]
        while (true) {
             var newValue: Any?
             while (true) {
                 newValue = tryTakeValue(slot) // attempt no-suspend fast path first
                 if (newValue !== NO_VALUE) break
                 awaitValue(slot) // await signal that the new value is available
             }
             collectorJob?.ensureActive()
             collector.emit(newValue as T)
        }
    } finally {
        freeSlot(slot)
    }
}

tryTakeValue 会在 while(true) 里调用,这里会调用 getPeekedValueLockedAt,从 buffer 里面获取数据。如果拿到了数据,那么这个循环终止。去执行我们传入的逻辑。如果没有数据,则会调用 awaitValue 挂起协程。

代码语言:javascript
复制
// SharedFlowImple getPeekedValueLockedAt
private fun getPeekedValueLockedAt(index: Long): Any? =
 when (val item = buffer!!.getBufferAt(index)) {
  is Emitter -> item.value
  else -> item
 }

当然 buffer 的维护逻辑还是比较复杂的,这部分内容在 updateCollectorIndexLockedtryTakeValue 里。感兴趣的读者可以自行研究。这里 StateFlowSharedFlow 的逻辑基本就理清楚了。通过一些大概的实现思路,我们可以更清楚知道两者的差异,避免一些使用过程中的困扰。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 半行代码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StateFlow
  • SharedFlow
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档