文章接上篇,这一篇我们好好聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。
故事还得从上次的协程分享开始,由于大家对协程的实践并不多,所以大家对下面的这段代码如何执行争论不休:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a + b
Log.e(TAG,"result:$c")
}有人说,a 和 b 会串行执行,有人说,a 和 b 会并行执行,那么执行的结果到底是什么样的?我们将在下面的文章给出。

悲伤的故事
本个系列文章分为三篇,本文是第二篇:
“《即学即用Kotlin - 协程》 《抽丝剥茧Kotlin - 协程基础篇》 《抽丝剥茧Kotlin - 协程Flow篇》
首先,我们得明确协程中有哪些东西,如果你会使用协程,那你肯定知道协程中有 CoroutineScope、CoroutineContext 和 CoroutineDispatcher,这些都是使用过程中我们可以接触到的 API。
我简单的整理了协程中主要的基础类:

协程的类图
协程的类结构可分为三部分:CoroutineScope、CoroutineContext 和 Continuation。
如果你会使用协程,那你肯定知道,协程遇到耗时 suspend 操作可以挂起,等到任务结束的时候,协程会自动切回来。
它的奥秘就是 Continuation,Continuation 可以理解程续体,你可以理解其每次在协程挂起点将剩余的代码包括起来,等到结束以后执行剩余的内容。一个协程的代码块可能会被切割成若干个 Continuation,在每个需要挂起的地方都会分配一个 Continuation。
先抛出一些结论,协程在做耗时操作的时候,如果执行了耗时 suspend 操作,会自动挂起,但是这个耗时操作终究是要做的,只不过切换到其他线程去做了,做完以后协程就需要切回来,但是切到哪儿呢?这便是 Continuation 需要解决的问题。
Continuation 的流程是这样的:

无论是使用 launch 还是 async 启动的协程,都会有一个结束的时候用来回调的 continuation。
关于 CoroutineScope 没有特别多要说的,它持有了 CoroutineContext,主要对协程的生命周期进行管理。
一开始看 CoroutineContext 觉得特别晕,不明白为啥要这么设计,看了 Bennyhuo 大佬的文章以后才稍微好转。
从上面协程的类的机构中可以看出,光看这个 CoroutineContext 这个接口(源码内容我们下面讲),会发现它有点像 List 集合,而继承自 CoroutineContext 接口的 Element 接口则定义了其中的元素。
随后,这个 Element 接口被划分成了两种类,Job 和 ContinuationInterceptor:
Job:从字面上来讲,它代表一个任务,Thread 也是执行任务,所以我们可以理解它定义了协程的一些东西,比如协程的状态,协程和子协程的管理方式等等。ContinuationInterceptor:也从字面上来看,它是 Continuation 的拦截器,通过拦截 Continuation,完成我们想要完成的工作,比如说线程的切换。上面我们从概念上介绍了协程的三大件,在这部分,我们从源码分析。
suspend 修饰的方法会在在编译期间被编译器做特殊处理,这种处理被成为CPS(续体转换风格) 转化,suspend 方法会被包裹成 Continuation。
说了这么久的 Continuation,我们还没有见过接口代码,由于接口内容不多,我就把所有的内容贴出来了:
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}我们重点关注Continuation#resumeWith()方法,从注释来看,通过返回 suspend 挂起点的值来恢复协程的执行,协程可以从参数 Result<T>) 获取成功的值或者失败的结果,如果没有结果,那么 Result<T> 的泛型是 Unit。Resulut 这个类也特别简单,感兴趣的同学可以查看源码。
BaseContinuationImpl 实现了 Continuation 接口,我们看一下 Continuation#resumeWith 方法的实现:
internal abstract class BaseContinuationImpl(
// 完成后调用的 Continuation
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1. 执行 suspend 中的代码块
val outcome = invokeSuspend(param)
// 2. 如果代码挂起就提前返回
if (outcome === COROUTINE_SUSPENDED) return
// 3. 返回结果
Result.success(outcome)
} catch (exception: Throwable) {
// 3. 返回失败结果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 4. 如果 completion 中还有子 completion,递归
current = completion
param = outcome
} else {
// 5. 结果通知
completion.resumeWith(outcome)
return
}
}
}
}
}主要的过程我在注释中已经标注出来了,我来解释一下 Continuation 的机制。
每个 suspend 方法生成的 BaseContinuationImpl,其构造方法有一个参数叫 completion,它也是一个 Continuation,它的调用时机是在 suspen 方法执行完毕的时候。我们后面称

Continuation流程
这个流程展示给我们的内容很直观了,简单起见,我们直接看3、4和5这一个 launch 启动流程就好,通常一个 launch 生成一个外层 Continuation一个相应的结果 Continuation,我们后面称结果 continuation 为 complete,Continuation 调用顺序是:
Continuation 中的 Continuation#resumeWith() 方法。launch 包裹的代码块,并返回一个结果。completion,由它完成协程结束的通知。上述的过程只存在于一个 launch 并且里面没有执行其他耗时的挂起操作,对于这些情况,我们将会在下面的文章讨论。
抛出问题一:可以看到,在注释2,遇到耗时的 suspend,返回的结果是一个 COROUTINE_SUSPENDED,后面会直接返回,耗时操作结束的时候,我们的 completion 怎么恢复呢?
在概要分析的时候,我们说 CoroutineContext 的结构像一个集合,是从它的接口得出结论的:
public interface CoroutineContext {
// get 方法,通过 key 获取
public operator fun <E : Element> get(key: Key<E>): E?
// 累加操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 操作符 + , 实际的实现调用了 fold 方法
public operator fun plus(context: CoroutineContext): CoroutineContext
// 移除操作
public fun minusKey(key: Key<*>): CoroutineContext
// CoroutineContext 定义的 Key
public interface Key<E : Element>
// CoroutineContext 中元素的定义
public interface Element : CoroutineContext {
// key
public val key: Key<*>
//...
}
}从中我们可以大致看出,CoroutineContext 中可以通过 Key 来获取元素 Element,并且 Element 接口也是继承自 CoroutineContext 接口。
除此以外,CoroutineContext 支持增加和移除操作,并且支持 + 操作符来完成增加。+ 操作符即 plus 方法是有具体实现的,感兴趣的可以自己看一下,主要涉及到了拦截器 ContinuationInterceptor 的添加。
Job 的注释中阐述定义是这样的:
“A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
从中我们可以得出:
从后台任务的角度来看,Job 听着有点像 Thread,和 Thread 一样,Job 也有各种状态,文档中对 Job 各种状态的注释(感觉大佬们的注释写的真棒~):

Job 另一个值得关注的点是对子 Job 的管理,主要的规则如下:
Job都会结束的时候,父 Job 才会结束Job 取消的时候,子 Job 也会取消上述的一些内容都可以从 Job 的接口文档中得出。那么,Job哪里来的?如果你看一下CoroutineScope#launch方法,你就会得出结论,该方法的返回类型就是 Job,我们每次调用该方法,都会创建一个 Job。
顾名思义,Continuation 拦截器,先看接口:
interface ContinuationInterceptor : CoroutineContext.Element {
// ContinuationInterceptor 在 CoroutineContext 中的 Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* 拦截 continuation
*/
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//...
}这个接口可以提炼的就这两个信息:
Key,也就是说,无论你后面一个 CoroutineContext 放了多少个拦截器,Key 为 ContinuationInterceptor 的拦截器只能有一个。Continuation 在调用其 Continuation#resumeWith() 方法,会执行其 suspend 修饰的函数的代码块,如果我们提前拦截到,是不是可以做点其他事情,比如说切换线程,这也是 ContinuationInterceptor 的作用之一。需要说明一下,我们通过 Dispatchers 来指定协程发生的线程,Dispatchers 实现了 ContinuationInterceptor接口。
CoroutineScope 的接口很简单:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}它要求后续的实现都要提供 CoroutineContext,不过我们都知道,CoroutineContext 是协程中很重要的东西,既包括 Job,也包括调度器。
在上面的代码中,我多次使用了 Android Jetpack 中的 Lifecycle 中协程的扩展库,好处我们获取 CoroutineScope 更加简单,无需在组件 onDestroy 的时候手动 cancel,并且它的源码超级简单,前提是你会使用 Lifecycle:
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
// ...
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}并且它也支持你在指定的生命周期调用协程,大家看一下接口就明白了。
先上一段使用代码:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}虽然代码很简单,但是源码还是比较复杂的,我们分步讲。
我已经在上面说明了,我们使用的 Lifecycle 的协程拓展库,如果我们不使用拓展库,就得使用 MainScope,它们的 CoroutineContext 都是一样的:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// ...
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// ...
return newScope
}
}显而易见,MainScope 和 LifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作为它们的 CoroutineContext。
说明一下,SupervisorJob 和Dispatchers.Main 很重要,它们分别代表了CoroutineContext 之前提及的 Job 和 ContinuationInterceptor,后面用到的时候再分析。
直接进入 CoroutineScope#launch() 方法:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}上面的方法一共有三个参数,前两个不作过多介绍,第三个参数:
block: suspend CoroutineScope.() -> Unit)这是一个方法,是一个 lambda 参数,同时也表明了它需要被 suspend 修饰。继续看 launch 方法,发现它主要做了两件事:
CoroutineContextContinuation在第一行代码 val newContext = newCoroutineContext(context) 做了第一件事,这里的 newCoroutineContext(context) 是一个扩展方法:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}CoroutineScope 使用本身的 coroutineContext 集合,利用 + 操作符将我们在 launch 方法中提供的 coroutineContext 添加进来。
回到上一段代码,通常我们不会指定 start 参数,所以它会使用默认的 CoroutineStart.DEFAULT,最终 coroutine 会得到一个 StandaloneCoroutine。
StandaloneCoroutine 实现自 AbstractCoroutine,翻开上面的类图,你会发现,它实现了 Continuation、Job 和 CoroutineScope 等一堆接口。需要说明一下,这个 StandaloneCoroutine 其实是我们当前 Suspend Contination 的 complete。
接着会调用
coroutine.start(start, coroutine, block)这就表明协程开始启动了。
进入到 AbstractCoroutine#start 方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}跳过层层嵌套,最后到达了:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}虽然这仅仅是一个函数,但是后面主要的逻辑都揭露了:
Continuation。Continuation。Continuation#resumeWith 方法。Continuation我这里用了 又,因为我们在 launch 中已经创建了一个 AbstractContinuaion,不过它是一个 complete,从各个函数的行参就可以看出来。
不过我们 suspend 修饰的外层 Continuation 还没有创建,它来了,是 SuspendLambda,它继承自 ContinuationImpl,如果你问我为什么源码中没找到具体实现,我觉得可能跟 suspend 修饰符有关,由编译器处理,但是调用栈确实是这样的:

看一下 SuspendLambda 类的实现:
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
//...
}可以看到,它的构造方法的形参就包括一个 complete。
回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}里面的拦截方法 Continuation#intercepted() 方法是一个扩展方法:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: thiscreateCoroutineUnintercepted(receiver, completion) 返回的是一个 SuspendLambda,所以它肯定是一个 ContinuationImpl,看一下它的拦截方法的实现:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...
}在 ContinuationImpl#intercepted()方法中,直接利用 context 这个数据结构通过 context[ContinuationInterceptor] 获取拦截器。
我们都知道 ContinuationInterceptor 具有拦截作用,它的直接实现是 CoroutineDispatcher 这个抽象类,所有其他调度器都直接或者间接继承这个类,我们关注一下它的拦截方法:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 1.拦截的 Continuation 被包了一层 DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//...
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// ...
override fun resumeWith(result: Result<T>) {
// ...
if (dispatcher.isDispatchNeeded(context)) {
// 2. 后面一个参数需要提供 Runnable,父类已经实现
dispatcher.dispatch(context, this)
}
//...
}
// ...
}
// SchedulerTask 是一个 Runnable
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
// ...
public final override fun run() {
// ...
try {
//...
withCoroutineContext(context, delegate.countOrElement) {
// 3. continuation 是 DispatchedContinuation 包裹的 continuation
continuation.resume(...)
}
}
//...
}
}简单来说,就是对原有的 Continuation 的 resumeWith 操作加了一层拦截,就像这样:

拦截流程
加入 CoroutineDispatcher 以后,执行真正的 Continue#resumeWith() 之前,会执行 CoroutineDispatcher#dispatch() 方法,所以我们现在关注 CoroutineDispatcher#dispatch 具体实现即可。
首先我们得明确这个 CoroutineDispatcher 来自哪里?它从 context 获取,context来自哪里?
注意 SuspendLambda 和 ContinuationImpl 的构造方法,SuspendLambda 中的参数没有 CoroutineContext,所以只能来自 completion 中的 CoroutineContext,而completion 的 CoroutineContext 来自 launch 方法中来自 CoroutineScope,默认是 SupervisorJob() + Dispatchers.Main,不过只有 Dispatchers.Main 继承了 CoroutineDispatcher。
Dispatchers.Main 是一个 MainCoroutineDispatcher,Android 中对应的 MainCoroutineDispatcher 是 HandlerContext:
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//...
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 利用主线程的 Handler 执行任务
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// 利用主线程的 Handler 延迟执行任务,将完成的 continuation 放在任务中执行
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//..
}重点来了,调度任务最后竟然交给了主线程的 Handler,其实想想也对,主线程的任务最后一般都会交给主线程的 Handler。
好奇的同学可能问了,如果不是主线程呢?不是主线程就利用的线程池:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// 执行期
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}结果可以说是很清晰了,coroutineScheduler 是一个线程池,如果像了解具体的过程,同学们可以自行查看代码。
读到这里,你可能有一点明白 CoroutineContext 为什么要设计成一种数据结构:
coroutineContext[ContinuationInterceptor] 就可以直接取到当前协程的拦截器,并且一个协程只能对应一个调度器。coroutineContext 的前面,所以在执行协程的时候,可以做拦截处理。同理,我们也可以使用 coroutineContext[Job] 获取当前协程。
再次回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}现在我们看 Continue#resumeCancellableWith() 方法,它是一个扩展方法,里面的调度逻辑是:
DispatchContinuation#resumeCancellableWithCoroutineDispatcher#dispatchContinuation#resumeWith这里的 Continuation 就是 SuspendLambda,它继承了 BaseContinuationImpl,我们看一下它的实现方法:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1. 执行 suspend 里面的代码块
val outcome = invokeSuspend(param)
// 2. 如果代码块里面执行了挂起方法,会提前返回
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 3. 如果完成的completion也是BaseContinuationImpl,就会进入循环
current = completion
param = outcome
} else {
// 4. 执行 completion resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}这边被我分为2个部分:
suspend 方法,并获取结果complete(放在下一步讲)在第一处会先执行 suspend 修饰的方法内容,在方法里面可能又会调度 suspend 方法,比如说我们的实例方法:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}因为我们在 getResult 执行了延时操作,所以我们 launch 方法肯定执行了耗时挂起方法,所以 BaseContinuationImpl#invokeSuspend 方法会返回一个 COROUTINE_SUSPENDED,结果你也看到了,该方法会提前结束。(说明一下,我没有找到BaseContinuationImpl#invokeSuspend 方法的具体实现,我猜可能跟编译器有关)
我猜你肯定跟我一样好奇,遇到耗时挂起会提前返回,那么耗时挂起如何对 complete 进行恢复的?
我们看一下 delay(1000) 这个延时操作在主线程是如何处理的:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
//...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//...
}可以看到,将恢复任务包了一个 Runnable,交给 Handler 的 Handler#postDelayed() 方法了。
对于 complete 的处理一般会有两种。
第一种情况是我们称之为套娃,完成回调的 Continuation 它本身也有自己的完成回调 Continuation,接下来循环就对了。
第二种情况,就是通过 complete 去完成回调,由于 complete 是 AbstractContinuation,我们看一下它的 resumeWith:
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public final override fun resumeWith(result: Result<T>) {
// 1. 获取当前协程的技术状态
val state = makeCompletingOnce(result.toState())
// 2. 如果当前还在等待完成,说明还有子协程没有结束
if (state === COMPLETING_WAITING_CHILDREN) return
// 3. 执行结束恢复的方法,默认为空
afterResume(state)
}
// 这是父类 JobSupport 中的 makeCompletingOnce 方法
// 为了方便查看,我复制过来
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
// tryMakeCompleting 的内容主要根据是否有子Job做不同处理
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
}这段代码的意思其实也很简单,就是协程即将完成,得先评估一下协程的技术状态,别协程还有东西在运行,就给结束了。对于一些有子协程的一些协程,会等待子协程结束的时候,才会结束当前协程。
一个 launch 的过程大概就是这样了。大致的流程图是这样的:

launch流程
下面我们再谈谈 async。
async 和 launch 的代码相似度很高:
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}最终也会进行三步走:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一层 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做拦截处理
.intercepted()
// 调用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}不同的是,async 返回的是一个 Deferred<T>,我们需要调用 Deferred#await() 去获取返回结果,它的实现在 JobSupport:
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
// ... awaitInternal方法来自父类 JobSupport
override suspend fun await(): T = awaitInternal() as T
// ...
// 这是 JobSupport 中的实现
internal suspend fun awaitInternal(): Any? {
// 循环获取结果
while (true) { // lock-free loop on state
val state = this.state
// 1. 如果处于完成状态
if (state !is Incomplete) {
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
// 2. 除非需要重试,不然就 break
if (startInternal(state) >= 0) break
}
// 等待挂起的方法
return awaitSuspend() // slow-path
}
}它的具体过程可以从我的注释看出,就不一一介绍了,感兴趣的同学可以查看源码。
本文一开始的代码是错的,连编译器都过不了,尴尬~
正确的代码应该是:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a.await() + bawait()
Log.e(TAG,"result:$c")
}如果是正确的代码,这里可能分两种情况:
如果你放在UI线程,那肯定是串行的,这时候有人说,我在 a 里使用 delay(1000),在 b 里使用 delay(2000),得到 c 的时候就花了 2000 毫秒啊,这不是并行吗?事情并不是这样的,delay 操作使用了 Handler#postDelay 方法,一个延迟了 1000 毫秒执行,一个延迟了 2000 毫秒执行,但是主线程只有一个,所以只能是串行。
如果是子线程,通常都是并行的,因为我们使用了线程池啊~
写这边源码分析的时候,一些细节总是找不到,比如说 suspendLambda 的子类找不到,自己对 Kotlin 的学习有待深入。

菜
所以本文有些地方还值得商榷,如果你有更好的理解,欢迎下方交流。