挖坑kotlin协程,预计分多篇文章彻底梳理一遍kotlin协程框架,废话不多说,先从协程作用域开始。
协程作用域CoroutinScope
在了解协程上下文之前,先要谈谈协程作用域-CoroutinScope,协程作用域可以理解为你创建的协程的约束范围,协程是运行在你约束的一个范围内的,这样就划分了协程的运行范围,对于协程的生命周期管理更加规范。每个协程构建器(如启动、异步等)都是 CoroutineScope 上的扩展,并继承其 coroutineContext 以自动传播其所有元素和取消。
CoroutinScope是一个接口,源码定义如下:
public interface CoroutineScope {
// 协程上下文,用来管理协程里的上下文元素
public val coroutineContext: CoroutineContext
}
看看协程框架里提供的全局作用域GlobalScope:
// 注意看它用object修饰了,代表他的生命周期是跟进程绑定的,而且是单例
public object GlobalScope : CoroutineScope {
/**
* 一个空的协程上下文
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
也可以自己根据需求创建作用域,官方推荐使用CoroutineScope(context: CoroutineContext)或者MainScope():
// 创建一个运行在主线程的作用域
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// 自定义协程上下文的作用域创建方式
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
协程上下文CoroutineContext
协程上下文从字面意思来看,就是贯穿你创建的某个协程的一个管家,可以结合Android开发中的上下文来理解,它持有协程运行时的各个参数。因为刚开始学习协程,这里先给个模糊描述,否则一开始给出一堆陌生艰涩的术语强行让你理解,你也会一头雾水,随着对协程框架的深入,自然会渐渐领会。
接下来就详细看看CoroutinContext,它也是一个接口:
public interface CoroutineContext {
// 由operator修饰的操作符重载,对应“[]”操作符
// 通过key获取一个Element对象
public operator fun <E : Element> get(key: Key<E>): E
// 折叠方法,提供一个初始值R和操作lambda,该方法被其子类Element实现
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 由operator修饰的操作符重载,对应“+”操作符;
// 合并两个CoroutineContext对象中的Element元素,将合并后的上下文返回,如果存在相同key的Element对象,则对其进行覆盖;
// EmptyCoroutineContext一个空实现的上下文;
// CombinedContext是CoroutineContext接口的一个实现类,也是链表的具体实现的一个节点,节点存在两个元素:element 当前的节点的集合元素,left CoroutineContext类型,指向链表的下一个元素;
// 另外plus函数在合并上下文的过程中将Key为ContinuationInterceptor的元素保持在链表的尾部,方便其快速的读取;
// 先了解ContinuationInterceptor是一个拦截器,下文中会介绍它
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // 如果待合并的context是一个空上下文,返回当前的上下文
// fold遍历context集合
context.fold(this) { acc, element ->//acc为当前上下文的集合,element为context集合的元素
val removed = acc.minusKey(element.key)//移除aac集合中的element元素,并返回移除后的一个集合
if (removed === EmptyCoroutineContext)
element // 如果移除后集合是一个空的上下文集合,那么当前element元素为合并后的上下文集合
else {
val interceptor = removed[ContinuationInterceptor]//获取拦截器
if (interceptor == null) CombinedContext(removed, element) // 如果interceptor为空,生成CombinedContext节点,CombinedContext元素为element,指向的链表节点是removed
else {
// 将拦截器移至链表尾部方便读取
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
// 删除对应key的Element元素,返回删除后CoroutineContext
public fun minusKey(key: Key<*>): CoroutineContext
// 集合中每个元素的key
public interface Key<E : Element>
// 集合中的元素定义,也是一个接口
public interface Element : CoroutineContext {
// 元素的key
public val key: Key<*>
// 通过key获取该元素,对应操作符[]
public override operator fun <E : Element> get(key: Key<E>): E=
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
//// 提供遍历上下文中所有元素的能力。
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
// 删除对应key的Element元素
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
上面源码里提到了一个很重要的类CombinedContext,这个类很重要,对于协程上下文的数据结构存储很重要:
// 左向链表实现
// element集合元素
// left 链表的下一个节点
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
// 在集合中获取一个以key为键的元素
override fun <E : Element> get(key: Key<E>): E{
var cur = this
while (true) {
cur.element[key]?.let { return it }
val next = cur.left
if (next is CombinedContext) {
cur = next
} else {
return next[key]
}
}
}
// 遍历集合中所有的元素。
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(left.fold(initial, operation), element)
// 在集合中删除一个键值为key的元素
public override fun minusKey(key: Key<*>): CoroutineContext {
element[key]?.let { return left }
val newLeft = left.minusKey(key)
return when {
newLeft === left -> this
newLeft === EmptyCoroutineContext -> element
else -> CombinedContext(newLeft, element)
}
}
// 集合长度
private fun size(): Int {
var cur = this
var size = 2
while (true) {
cur = cur.left asCombinedContext ?: return size
size++
}
}
// 集合中是否包含某个元素
private fun contains(element: Element): Boolean =
get(element.key) == element
...
}
从以上源码可以猜得出来,协程上下文是有个集合来管理上下文元素Element的,而这个Element的集合就是协程真正的上下文,里面包含被封装成Element的各个运行要素。它的结构设计使得很容易把他们拼装和获取。举个例子:
// 上下文组合添加
var context = CoroutineName("MainActivity") + Dispatchers.Main
println("1context:$context")
// 添加重复类型上下文会覆盖之前的
context += Dispatchers.Default
println("2context:$context")
// 获取CoroutineName,注意使用的key
println("3context:${context[CoroutineName.Key]}")
// 移除CoroutineName
context = context.minusKey(CoroutineName.Key)
println("4context:$context")
输出结果:
1context:[CoroutineName(MainActivity), Dispatchers.Main[missing, cause=java.lang.RuntimeException: Stub!]]
2context:[CoroutineName(MainActivity), Dispatchers.Default]
3context:CoroutineName(MainActivity)
4context:Dispatchers.Default
以上的CoroutineName和Dispatchers.main都属于Element的实现类,也是协程中上下文元素,类似的上下文元素还有很多,比如Job、CoroutineId等等。通过以上的方法可以很好管理这些元素。
协程启动策略-CoroutineStart
定义协程生成器的启动选项。定义在一个枚举类里面:
public enum class CoroutineStart {
DEFAULT,
LAZY,
@ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
ATOMIC,
UNDISPATCHED;
@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
/**
* Returns `true` when [LAZY].
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public val isLazy: Boolean get() = this === LAZY
}
DEFAULT -- 立即根据其上下文安排协程执行;
LAZY -- 仅在需要时懒惰地启动协程;
ATOMIC -- 原子(以不可取消的方式)根据其上下文安排执行协程;
UNDISPATCHED -- 立即执行协程,直到当前线程中的第一个挂起点。
这里暂时不展开解释每个启动方式的含义,先有个基本印象即可。
launch()方法启动协程
启动协程最常用的一个方式,launch()方法是协程框架提供的CoroutineScope的扩展方法,意味着必须在协程作用域里面执行:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext, // 上下文
start: CoroutineStart = CoroutineStart.DEFAULT, // 启动策略
block: suspend CoroutineScope.() -> Unit // 协程代码块
): Job {
// 第一步,先把传进来的上下文组装成一个新的
val newContext = newCoroutineContext(context)
// 第二步,使用组合后的newContext构建一个Coroutine,分析的时候我们使用DEFAULT策略,所以这里创建的是StandaloneCoroutine,
// 它实际是个实现了续体Continuation和CoroutineScope和Job的子类,具体后面分析
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
// 第三步,启动协程
coroutine.start(start, coroutine, block)
// 第四步, 作为Job返回这个实例,job可以用来控制协程的一些运行状态,可以看做是协程体的引用
return coroutine
}
// newCoroutineContext是一个扩展函数
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
// 符号“+”对应CoroutineContext的plus方法
val combined = coroutineContext + context
// 看下else非debug的情况,得到合并后的combined复制给变量debug
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
// 实例中调度器使用的Dispatchers.Default,所以这里执行else分支,直接返回coroutineContext + context相加后的结果
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
现在我们在MainActivity启动一个协程,一步步看看它是怎么运行的:
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
GlobalScope.launch(Dispatchers.Default){
println("launch test")
}
}
}
很简单的协程,没有挂起函数,只是在协程里执行了一个println()函数,接下来就跟着launch()方法一步步走下去看看协程是怎么运行起来的。
协程调度器Dispatchers
注意看,launch()方法里我们传入了Dispatchers.Default,这个对应的是参数context:CoroutinContext,说明它实现了CoroutineContext接口,结合之前分析的launch方法代码,这个调度器被组合进了newContext传入到StandaloneCoroutine里去了。来看看Dispatchers.Default是个啥:
/**
* Groups various implementations of [CoroutineDispatcher].
*/
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public val IO: CoroutineDispatcher = DefaultIoScheduler
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@DelicateCoroutinesApi
public fun shutdown() {
DefaultExecutor.shutdown()
// Also shuts down Dispatchers.IO
DefaultScheduler.shutdown()
}
}
可以看到Dispatchers.Default是对象声明Dispatchers的不可变量,类型是CoroutineDispatcher。
注意它是全局单例存在的,代表其他协程使用它的时候,用的都是同一个实例,这里可以提前说明一下Dispatchers.Default维护了一个线程池,所以全局的协程(使用Dispatchers.Default或Dispatchers.IO作为上下文)都是共用一个线程池。这个特性要记住!!
继续看看Dispatchers.Default的实现:
// Instance of Dispatchers.Default 依然是个单例对象!
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
// Shuts down the dispatcher, used only by Dispatchers.shutdown()
internal fun shutdown() {
super.close()
}
// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
override fun close() {
throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
}
override fun toString(): String = "Dispatchers.Default"
}
没什么可研究的信息,继续看他的父类SchedulerCoroutineDispatcher,看到它带着Scheduler字样的名字和CORE_POOL_SIZE, MAX_POOL_SIZE这些构造参数,可以判断它跟线程池有关。
// Instantiated in tests so we can test it in isolation
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
// Executor就是java里的线程池顶级接口
override val executor: Executor
get() = coroutineScheduler
// 协程的线程池实例
private var coroutineScheduler = createScheduler()
// 协程的线程池类CoroutineScheduler,这里暂时不深究这个类的实现,把它理解成java里的线程池一样的性质就行
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
// 协程的调度方法,最后还是交给了线程池去调度,所谓调度,看来可以理解成线程切换
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block, tailDispatch = true)
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
coroutineScheduler.dispatch(block, context, tailDispatch)
}
override fun close() {
coroutineScheduler.close()
}
...略
}
从SchedulerCoroutineDispatcher的源码可以看出,这个类维护了一个线程池,用来执行协程线程的切换。再看看它的父类ExecutorCoroutineDispatcher:
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
CoroutineDispatcher,
{ it asExecutorCoroutineDispatcher })
/**
* Underlying executor of current [CoroutineDispatcher].
*/
public abstract val executor: Executor
/**
* Closes this coroutine dispatcher and shuts down its executor.
*
* It may throw an exception if this dispatcher is global and cannot be closed.
*/
public abstract override fun close()
}
是个抽象类,只是定义了协程调度器的规范,继续看它的父类CoroutineDispatcher:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it asCoroutineDispatcher })
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
// 调度方法是在这里定义的
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
@InternalCoroutinesApi
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
// 核心方法interceptContinuation,拦截续体,包装成了成了DispatchedContinuation,DispatchedContinuation也是一个很重要的类
// 协程的拦截器实现
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
...略
}
// 续体拦截器接口
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...略
}
通过以上源码分析,Dispatchers.Default也就是协程调度器CoroutineDispatcher主要有两个核心方法,
dispatch(context: CoroutineContext, block: Runnable):用来切换线程
interceptContinuation(continuation: Continuation<T>): Continuation<T>:用来拦截续体Continuation,将其包装成DispatchedContinuation
记住这上面两个方法,后面的流程分析会多次用到。
续体Continuation
在上面的代码分析中频繁出现了一个概念-续体Continuation,这个概念非常重要,协程启动,挂起,恢复都有它有关,看看它的定义,是一个接口:
public interface Continuation<in T> {
/**
* 持有协程上下文
*/
public val context: CoroutineContext
/**
* 核心方法,恢复、开启协程体的执行入口方法
*
*/
public fun resumeWith(result: Result<T>)
}
要理解续体是怎么工作的,我们继续跟着launch函数走:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext, // 上下文
start: CoroutineStart = CoroutineStart.DEFAULT, // 启动策略
block: suspend CoroutineScope.() -> Unit // 协程代码块
): Job {
// 第一步,先把传进来的上下文组装成一个新的
val newContext = newCoroutineContext(context)
// 第二步,构建一个Coroutine,分析的时候我们使用DEFAULT策略,所以这里创建的是StandaloneCoroutine,
// 它实际是个实现了续体Continuation和CoroutineScope和Job的子类,具体后面分析
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
// 第三步,启动协程
coroutine.start(start, coroutine, block)
// 第四步, 作为Job返回这个实例,job可以用来控制协程的一些运行状态,可以看做是协程体的引用
return coroutine
}
第三步调用了StandaloneCoroutine的start方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
接着调用了CoroutineStart的invoke方法(kotlin里invoke方法可以通过对象来调用,这里官方实在很恶心,到处埋雷):
public enum class CoroutineStart {
// 到这里理一下对应关系,入参receiver和completion都是launch方法里StandaloneCoroutine实例coroutine,R泛型这里是协程作用域CoroutineScope
// StandaloneCoroutine也是实现了CoroutineScope接口的,completion则是续体Continuation,StandaloneCoroutine也实现了Continuation
// block是我们业务代码传入的协程代码块
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
// 我们使用的DEFAULT策略,执行这里启动
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
... 略
}
// StandaloneCoroutine,其父类实现了Coroutine和CoroutineScope
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
// 重点看父类AbstractCoroutine,注意它的构造参数parentContext,传入了父协程上下文
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
/**
* 合并了上下文元素
*/
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this
/**
* 关注一下这个方法,续体里定义的抽象方法
*/
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
protected open fun afterResume(state: Any?): Unit = afterCompletion(state)
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
... 略
}
继续跟踪block.startCoroutineCancellable(receiver, completion),block是我们启动协程传入的lambda,startCoroutineCancellable是lambda函数的扩展函数:
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
继续看方法体中第一个方法createCoroutineUnintercepted,这个方法也是suspend () -> T 函数的扩展函数,它是这样的:
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
// 这里对completion进行了一层封装
val probeCompletion = probeCoroutineCreated(completion)
// 判断函数suspend () -> T 实例是否属于BaseContinuationImpl的子类
return if (this is BaseContinuationImpl)
// 是的话就调用其create方法
// 这里涉及到kotlin编译的问题,用kotlin看源码往往很蛋疼,这时候就需要把之前的那个协程体反编译成java来看看
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
// MainActivity里协程启动的方法反编译成java代码
BuildersKt.launch$default((CoroutineScope)((CoroutineScope)GlobalScope.INSTANCE)
, (CoroutineContext)((CoroutineContext)Dispatchers.getDefault())
, null
// 这是我们写的lambda代码块,被包装成了Function2<CoroutineScope, Continuation<super Unit>, Object>实例,然而
// 其实在接下来还会继续被包装成SuspendLambda类对象
, (Function2)((Function2)new Function2<CoroutineScope, Continuation<super Unit>, Object>(null){
// 状态标志位
int label;
@Nullable
// 我们写的lambda表达式被放在了invokeSuspend方法里执行
public final Object invokeSuspend(@NotNull Object object) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0: {
ResultKt.throwOnFailure((Object)object);
// 我们在协程里就做了一件事执行println
System.out.println((Object)"launch test");
return Unit.INSTANCE;
}
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
// 注意这个函数,就是上面createCoroutineUnintercepted函数里调用到的地方
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
return (Continuation)new /* invalid duplicate definition of identical inner class */;
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<super Unit> p2) {
return (this.create((Object)p1, p2)).invokeSuspend((Object)Unit.INSTANCE);
}
}), (int)2, null);
继续跟踪SuspendLambda类及其父类链:
// 构造函数的入参completion,也是个续体,注意后面的分析,这个续体是谁
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}
// ContinuationImpl抽象类
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
// _context协程上下文用的入参completion的上下文
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>= null
// 重点关注这个方法,这个方法先取出了completion上下文中的续体拦截器,然后调用了interceptContinuation方法
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
// BaseContinuationImpl抽象类,注意看实现了Continuation接口,说明我们写的协程方法体也是一个续体Continuation
internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
// Continuation的核心方法
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 核心代码,调用了invokeSuspend函数,前面分析过这个函数里放的就是我们写的协程方法体block!!!
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
// 在上面的反编译的java有对该方法的实现
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
// 在上面的反编译的java有对该方法的实现
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
public override fun toString(): String =
"Continuation at ${getStackTraceElement() ?: this::class.java.name}"
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = completion asCoroutineStackFrame
public override fun getStackTraceElement(): StackTraceElement=
getStackTraceElementImpl()
}
做个小总结:协程启动函数里的协程体lambda代码块实际编译的时候会被封装成SuspendLambda对象,而SuspendLambda的继承链路是这样的:
SuspendLambda->ContinuationImpl->BaseContinuationImpl->Continuation
说明协程体lambda代码块也是个续体Continuation,并且在协程启动方法里会调用自身实现的create(value: Any?, completion: Continuation<>)方法创建实例
继续回到协程启动流程:
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
// 不难发现,协程的启动就是由续体串起来的,记住每个调用的时候是哪个续体实例,就能记住协程的启动流程
createCoroutineUnintercepted(completion) // 第一步,将block转为SuspendLambda实例,此实例持有了completion,也就持有了创建协程时传入的上下文,包括拦截器
.intercepted() // 第二步,调用了intercepted()方法,这个方法是个Continuation扩展函数
.resumeCancellableWith(Result.success(Unit)) // 第三步,这个函数依然是Continuation扩展函数
}
// 第二步的扩展函数intercepted()
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// 调用它的对象正是SuspendLambda,它就是ContinuationImpl的子类,而从上面的ContinuationImpl源码我们可以
//看到,ContinuationImpl的intercepted()方法实际是调用了其持有的completion上下文中的续体拦截器,然后调用了interceptContinuation方法。
// 再往上面看这个completion正是协程启动launch方法里创建的StandaloneCoroutine,它里面的上下文属于拦截器性质的就是我们之前研究的Dispatchers.DEFAULT。
// 由此深挖到最终执行了CoroutineDispatcher类里的interceptContinuation(continuation: Continuation<T>): Continuation<T>方法,它返回了一个
// 经过包装后的DispatchedContinuation实例,它也是个续体Continuation,后面会继续分析
(this asContinuationImpl)?.intercepted() ?: this
// 第三步的扩展函数resumeCancellableWith()
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)= null
): Unit = when (this) {
// 经过第二步分析我们知道最终我们得到的就是一个DispatchedContinuation对象,所以执行了resumeCancellableWith(result, onCancellation)方法
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
接下来看看DispatchedContinuation类,重点看resumeCancellableWith方法:
// 注意看它的构造方法参数,一个CoroutineDispatcher和Continuation,从CoroutineDispatcher类里interceptContinuation(continuation: Continuation<T>)方法
// 可以看到,构建DispatchedContinuation传入的dispatcher就是自己(this),而传入的continuation就是由我们写的协程lambda表达式封装成的SuspendLambda。
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
// delegate是自己,其实就是代理的continuation在干活
override val delegate: Continuation<T>
get() = this
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
// 关键步骤,在这里执行了调度方法dispatch,前面我们分析过dispatcher的dispatch方法就是往线程池里调度了一个runnable,真正
// 执行的地方是run方法,而这个runnable传的是this,说明DispatchedContinuation是实现了Runnable接口的,而实现的地方其实是在其父类DispatchedTask
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
...略
}
// 父类DispatchedTask,父类SchedulerTask继续往上找继承关系就能找到Runnable接口,这里不深究线程池,会另外开篇讲
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
internal abstract val delegate: Continuation<T>
// 关键方法run
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable= null
try {
val delegate = delegate as DispatchedContinuation<T>
// 结合前面的代码,这个continuation是SuspendLambda对象
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
// 异常处理
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
// 正常流程走到这里,调用resume方法,resume也是个扩展函数,真正执行的是continuation的resumeWith方法
// 前面分析过SuspendLambda的resumeWith方法是其父类BaseContinuationImpl实现的,里面执行了invokeSuspend()方法,
// 至此,我们定义的协程体方法终于得到了执行。
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
...略
}
// resume也是个扩展函数
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
至此,在各个续体饶了一大圈终于梳理完了协程是怎么工作的,此篇文章主要是理清楚协程的一些基本概念和运转流程,还没涉及到挂起和恢复和协程线程池的管理,下篇将会对协程的挂起和恢复进行剖析。
附:流程图