上一篇我们初步认识了 StateFlow
和 SharedFlow
,为了有个相对更加清晰的认识,我们来看下它们的具体实现。
先来看看我们常用的 StateFlow(MutableStateFlow)
MutableStateFlow
实际是一个包括了一个 value
的接口。我们使用的 MutableStateFlow
则是一个创建具体子类的方法:
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
StateFlow
的实现类是 StateFlowImpl
类。我们看下 setValue
:
// StateFlowImpl setValue
public override var value: T
get() = NULL.unbox(_state.value)
set(value) { updateState(null, value ?: NULL) }
// StateFlowImple updateState
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
var curSlots: Array<StateFlowSlot?>? = this.slots
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false
if (oldState == newState) return true
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return true
}
curSlots = slots
}
while (true) {
curSlots?.forEach {
it?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return true
}
curSequence = sequence
curSlots = slots
}
}
}
这里可以看到
StateFlow
更新值是线程安全的。LiveData
的 distinctUntilChanged
。StateFlowImpl
维护了一个 Slot
数组,每个collect对应了其中一个 Slot
。makePending
来标记 Slot
的状态:// StateFlowSlot makePending
fun makePending() {
_state.loop { state ->
when {
state == null -> return
state === PENDING -> return
state === NONE -> {
if (_state.compareAndSet(state, PENDING)) return
}
else -> {
if (_state.compareAndSet(state, NONE)) {
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
当状态还是 Pending
的时候,会忽略。否则如果是在协程里运行,则会恢复协程。接着看 StateFlowImpl
的 collect
方法:
// StateFlowImpl collect
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null
while (true) {
val newState = _state.value
collectorJob?.ensureActive()
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
这里会有一个 while(true) 循环,使用我们在代码中写 collect 的时候应该写成这样:
scope.launch {
state1.collect {}
}
scope.launch {
state2.collect {}
}
千万不要写成
scope.launch {
state1.collect {}
state2.collect {}
}
这样 state2 的 collect 不会执行。
关于 collect:
Slot
Pending
的时候,执行 awaitPending
方法//StateFlowSlot awaitPending
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
if (_state.compareAndSet(NONE, cont)) return@sc
cont.resume(Unit)
}
这里因为不在 Pending
状态了,所以也没有必要持续死循环了,所以这里会挂起协程。并且把协程的 Continuation
对象赋值给 _state。这样当发送新的值的时候,协程会被恢复执行。所以 StateFlow
的 collect
方法在没有值更新的时候也是会挂起协程不消耗系统资源的。
接着看下 SharedFlow
的实现, SharedFlow
实现比 StateFlow
复杂一点。我们经常使用的 MutableSharedFlow
也是一个创建 Flow 对象的方法。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
这里 bufferCapacity
参数传的是 replay
和 extraBufferCapacity
的和。SharedFlowImpl
内部维护了一个缓存队列。用来存储数据流的内容。总体结构在代码的注释中可以看到:
/*
Logical structure of the buffer
buffered values
/-----------------------\
replayCache queued emitters
/----------\/----------------------\
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | |
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
^ ^ ^ ^
| | | |
head | head + bufferSize head + totalSize
| | |
index of the slowest | index of the fastest
possible collector | possible collector
| |
| replayIndex == new collector's index
\---------------------- /
range of possible minCollectorIndex
head == minOf(minCollectorIndex, replayIndex) // by definition
totalSize == bufferSize + queueSize // by definition
INVARIANTS:
minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
replayIndex <= head + bufferSize
*/
看下发射新数据的 emit
方法:
// SharedFlowImpl emit
override suspend fun emit(value: T) {
if (tryEmit(value)) return
emitSuspend(value)
}
// tryEmit
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
for (cont in resumes) cont?.resume(Unit)
return emitted
}
tryEmit
方法是线程安全的,里面会调用 tryEmitLocked
:
// SharedFlowImpl tryEmitLocked
private fun tryEmitLocked(value: T): Boolean {
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value)
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false
BufferOverflow.DROP_LATEST -> return true
BufferOverflow.DROP_OLDEST -> {}
}
}
enqueueLocked(value)
bufferSize++
if (bufferSize > bufferCapacity) dropOldestLocked()
if (replaySize > replay) {
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
当发生背压,处理没有发射快的时候(缓存size大于等于缓存容量并且最小的收集器下标小于等于replay的下标),会根据 onBufferOverflow
的值选择不同的处理。
emitSuspend
,内部也会调用到 enqueueLocked
enqueueLocked
把新值加入队列。// SharedFlowImple enqueueLocked
private fun enqueueLocked(item: Any?) {
val curSize = totalSize
val buffer = when (val curBuffer = buffer) {
null -> growBuffer(null, 0, 2)
else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
}
buffer.setBufferAt(head + curSize, item)
}
这里会把值加到 buffer 里面。如果 buffer 队列容量不足,会进行 2 倍的扩容。接着会调用 findSlotsToResumeLocked
去恢复。
接着看下 collect
的实现:
// SharedFlowImpl collect
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
while (true) {
var newValue: Any?
while (true) {
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !== NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
collectorJob?.ensureActive()
collector.emit(newValue as T)
}
} finally {
freeSlot(slot)
}
}
tryTakeValue
会在 while(true) 里调用,这里会调用 getPeekedValueLockedAt
,从 buffer 里面获取数据。如果拿到了数据,那么这个循环终止。去执行我们传入的逻辑。如果没有数据,则会调用 awaitValue
挂起协程。
// SharedFlowImple getPeekedValueLockedAt
private fun getPeekedValueLockedAt(index: Long): Any? =
when (val item = buffer!!.getBufferAt(index)) {
is Emitter -> item.value
else -> item
}
当然 buffer 的维护逻辑还是比较复杂的,这部分内容在 updateCollectorIndexLocked
和 tryTakeValue
里。感兴趣的读者可以自行研究。这里 StateFlow
和 SharedFlow
的逻辑基本就理清楚了。通过一些大概的实现思路,我们可以更清楚知道两者的差异,避免一些使用过程中的困扰。