当前位置: 首页>后端>正文

庖丁解牛,一文搞懂Kotlin协程的运行原理

挖坑kotlin协程,预计分多篇文章彻底梳理一遍kotlin协程框架,废话不多说,先从协程作用域开始。

协程作用域CoroutinScope

在了解协程上下文之前,先要谈谈协程作用域-CoroutinScope,协程作用域可以理解为你创建的协程的约束范围,协程是运行在你约束的一个范围内的,这样就划分了协程的运行范围,对于协程的生命周期管理更加规范。每个协程构建器(如启动、异步等)都是 CoroutineScope 上的扩展,并继承其 coroutineContext 以自动传播其所有元素和取消。
CoroutinScope是一个接口,源码定义如下:

public interface CoroutineScope {
    // 协程上下文,用来管理协程里的上下文元素
    public val coroutineContext: CoroutineContext
}

看看协程框架里提供的全局作用域GlobalScope:

// 注意看它用object修饰了,代表他的生命周期是跟进程绑定的,而且是单例
public object GlobalScope : CoroutineScope {
    /**
     * 一个空的协程上下文
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

也可以自己根据需求创建作用域,官方推荐使用CoroutineScope(context: CoroutineContext)或者MainScope():

// 创建一个运行在主线程的作用域
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// 自定义协程上下文的作用域创建方式
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())
协程上下文CoroutineContext

协程上下文从字面意思来看,就是贯穿你创建的某个协程的一个管家,可以结合Android开发中的上下文来理解,它持有协程运行时的各个参数。因为刚开始学习协程,这里先给个模糊描述,否则一开始给出一堆陌生艰涩的术语强行让你理解,你也会一头雾水,随着对协程框架的深入,自然会渐渐领会。
接下来就详细看看CoroutinContext,它也是一个接口:

public interface CoroutineContext {  
     // 由operator修饰的操作符重载,对应“[]”操作符  
     // 通过key获取一个Element对象  
     public operator fun <E : Element> get(key: Key<E>): E 
   
     // 折叠方法,提供一个初始值R和操作lambda,该方法被其子类Element实现
     public fun <R> fold(initial: R, operation: (R, Element) -> R): R  
     // 由operator修饰的操作符重载,对应“+”操作符;
     // 合并两个CoroutineContext对象中的Element元素,将合并后的上下文返回,如果存在相同key的Element对象,则对其进行覆盖;
     // EmptyCoroutineContext一个空实现的上下文;
     // CombinedContext是CoroutineContext接口的一个实现类,也是链表的具体实现的一个节点,节点存在两个元素:element 当前的节点的集合元素,left CoroutineContext类型,指向链表的下一个元素;
     // 另外plus函数在合并上下文的过程中将Key为ContinuationInterceptor的元素保持在链表的尾部,方便其快速的读取;
     // 先了解ContinuationInterceptor是一个拦截器,下文中会介绍它  

     public operator fun plus(context: CoroutineContext): CoroutineContext =  
         if (context === EmptyCoroutineContext) this else // 如果待合并的context是一个空上下文,返回当前的上下文  
             // fold遍历context集合  
             context.fold(this) { acc, element ->//acc为当前上下文的集合,element为context集合的元素  
                 val removed = acc.minusKey(element.key)//移除aac集合中的element元素,并返回移除后的一个集合  
                 if (removed === EmptyCoroutineContext)  
                      element // 如果移除后集合是一个空的上下文集合,那么当前element元素为合并后的上下文集合  
                 else {  
                     val interceptor = removed[ContinuationInterceptor]//获取拦截器  
                     if (interceptor == null) CombinedContext(removed, element) // 如果interceptor为空,生成CombinedContext节点,CombinedContext元素为element,指向的链表节点是removed  
                     else {  
                         // 将拦截器移至链表尾部方便读取  
                        val left = removed.minusKey(ContinuationInterceptor)  
                         if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else  
                             CombinedContext(CombinedContext(left, element), interceptor)  
                     }  
                 }  
             }  
   
     // 删除对应key的Element元素,返回删除后CoroutineContext  
     public fun minusKey(key: Key<*>): CoroutineContext     
     // 集合中每个元素的key  
     public interface Key<E : Element>  
   
     // 集合中的元素定义,也是一个接口  
     public interface Element : CoroutineContext {  
         // 元素的key  
         public val key: Key<*>  
   
         // 通过key获取该元素,对应操作符[]  
         public override operator fun <E : Element> get(key: Key<E>): E=  
             @Suppress("UNCHECKED_CAST")  
             if (this.key == key) this as E else null  
         //// 提供遍历上下文中所有元素的能力。  
         public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =  
             operation(initial, this)  
   
        // 删除对应key的Element元素  
         public override fun minusKey(key: Key<*>): CoroutineContext =  
             if (this.key == key) EmptyCoroutineContext else this  
     }  
 }

上面源码里提到了一个很重要的类CombinedContext,这个类很重要,对于协程上下文的数据结构存储很重要:

// 左向链表实现  
 // element集合元素  
 // left 链表的下一个节点  
 internal class CombinedContext(  
     private val left: CoroutineContext,  
     private val element: Element  
 ) : CoroutineContext, Serializable {  
   
     // 在集合中获取一个以key为键的元素  
     override fun <E : Element> get(key: Key<E>): E{  
         var cur = this  
         while (true) {  
             cur.element[key]?.let { return it }  
             val next = cur.left  
             if (next is CombinedContext) {  
                 cur = next  
             } else {  
                 return next[key]  
             }  
         }  
     }  
   
     // 遍历集合中所有的元素。  
     public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =  
         operation(left.fold(initial, operation), element)  
   
     // 在集合中删除一个键值为key的元素  
     public override fun minusKey(key: Key<*>): CoroutineContext {  
         element[key]?.let { return left }  
         val newLeft = left.minusKey(key)  
         return when {  
             newLeft === left -> this  
             newLeft === EmptyCoroutineContext -> element  
             else -> CombinedContext(newLeft, element)  
         }  
     }  
   
     // 集合长度  
     private fun size(): Int {  
         var cur = this  
         var size = 2  
         while (true) {  
             cur = cur.left asCombinedContext ?: return size  
             size++  
         }  
     }  
   
     // 集合中是否包含某个元素  
     private fun contains(element: Element): Boolean =  
         get(element.key) == element  
        ...  
 } 

从以上源码可以猜得出来,协程上下文是有个集合来管理上下文元素Element的,而这个Element的集合就是协程真正的上下文,里面包含被封装成Element的各个运行要素。它的结构设计使得很容易把他们拼装和获取。举个例子:

        // 上下文组合添加
        var context = CoroutineName("MainActivity") + Dispatchers.Main
        println("1context:$context")
        // 添加重复类型上下文会覆盖之前的
        context += Dispatchers.Default
        println("2context:$context")
        // 获取CoroutineName,注意使用的key
        println("3context:${context[CoroutineName.Key]}")
        // 移除CoroutineName
        context = context.minusKey(CoroutineName.Key)
        println("4context:$context")

输出结果:

  1context:[CoroutineName(MainActivity), Dispatchers.Main[missing, cause=java.lang.RuntimeException: Stub!]]
  2context:[CoroutineName(MainActivity), Dispatchers.Default]
  3context:CoroutineName(MainActivity)
  4context:Dispatchers.Default

以上的CoroutineName和Dispatchers.main都属于Element的实现类,也是协程中上下文元素,类似的上下文元素还有很多,比如Job、CoroutineId等等。通过以上的方法可以很好管理这些元素。

协程启动策略-CoroutineStart

定义协程生成器的启动选项。定义在一个枚举类里面:

public enum class CoroutineStart {
    
    DEFAULT,
    LAZY,
    @ExperimentalCoroutinesApi // Since 1.0.0, no ETA on stability
    ATOMIC,
    UNDISPATCHED;
 
    @InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }

    @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

    /**
     * Returns `true` when [LAZY].
     *
     * @suppress **This an internal API and should not be used from general code.**
     */
    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY
}

DEFAULT -- 立即根据其上下文安排协程执行;
LAZY -- 仅在需要时懒惰地启动协程;
ATOMIC -- 原子(以不可取消的方式)根据其上下文安排执行协程;
UNDISPATCHED -- 立即执行协程,直到当前线程中的第一个挂起点。
这里暂时不展开解释每个启动方式的含义,先有个基本印象即可。

launch()方法启动协程

启动协程最常用的一个方式,launch()方法是协程框架提供的CoroutineScope的扩展方法,意味着必须在协程作用域里面执行:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, // 上下文
    start: CoroutineStart = CoroutineStart.DEFAULT, // 启动策略
    block: suspend CoroutineScope.() -> Unit // 协程代码块
): Job {
    // 第一步,先把传进来的上下文组装成一个新的
    val newContext = newCoroutineContext(context)
    // 第二步,使用组合后的newContext构建一个Coroutine,分析的时候我们使用DEFAULT策略,所以这里创建的是StandaloneCoroutine,
    // 它实际是个实现了续体Continuation和CoroutineScope和Job的子类,具体后面分析
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    // 第三步,启动协程
    coroutine.start(start, coroutine, block)
   // 第四步, 作为Job返回这个实例,job可以用来控制协程的一些运行状态,可以看做是协程体的引用
    return coroutine
}

 // newCoroutineContext是一个扩展函数  
 public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {  
     // 符号“+”对应CoroutineContext的plus方法  
     val combined = coroutineContext + context  
          // 看下else非debug的情况,得到合并后的combined复制给变量debug  
     val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined  
     // 实例中调度器使用的Dispatchers.Default,所以这里执行else分支,直接返回coroutineContext + context相加后的结果  
     return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)  
         debug + Dispatchers.Default else debug  
 }  

现在我们在MainActivity启动一个协程,一步步看看它是怎么运行的:

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        GlobalScope.launch(Dispatchers.Default){
            println("launch test")
        }
    }
}

很简单的协程,没有挂起函数,只是在协程里执行了一个println()函数,接下来就跟着launch()方法一步步走下去看看协程是怎么运行起来的。

协程调度器Dispatchers

注意看,launch()方法里我们传入了Dispatchers.Default,这个对应的是参数context:CoroutinContext,说明它实现了CoroutineContext接口,结合之前分析的launch方法代码,这个调度器被组合进了newContext传入到StandaloneCoroutine里去了。来看看Dispatchers.Default是个啥:

/**
 * Groups various implementations of [CoroutineDispatcher].
 */
public actual object Dispatchers {

    @JvmStatic
    public actual val Default: CoroutineDispatcher = DefaultScheduler

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultIoScheduler

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @DelicateCoroutinesApi
    public fun shutdown() {
        DefaultExecutor.shutdown()
        // Also shuts down Dispatchers.IO
        DefaultScheduler.shutdown()
    }
}

可以看到Dispatchers.Default是对象声明Dispatchers的不可变量,类型是CoroutineDispatcher。
注意它是全局单例存在的,代表其他协程使用它的时候,用的都是同一个实例,这里可以提前说明一下Dispatchers.Default维护了一个线程池,所以全局的协程(使用Dispatchers.Default或Dispatchers.IO作为上下文)都是共用一个线程池。这个特性要记住!!
继续看看Dispatchers.Default的实现:

// Instance of Dispatchers.Default 依然是个单例对象!
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    // Shuts down the dispatcher, used only by Dispatchers.shutdown()
    internal fun shutdown() {
        super.close()
    }

    // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
    override fun close() {
        throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
    }

    override fun toString(): String = "Dispatchers.Default"
}

没什么可研究的信息,继续看他的父类SchedulerCoroutineDispatcher,看到它带着Scheduler字样的名字和CORE_POOL_SIZE, MAX_POOL_SIZE这些构造参数,可以判断它跟线程池有关。

// Instantiated in tests so we can test it in isolation
internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
    // Executor就是java里的线程池顶级接口
    override val executor: Executor
        get() = coroutineScheduler

    // 协程的线程池实例
    private var coroutineScheduler = createScheduler()
    // 协程的线程池类CoroutineScheduler,这里暂时不深究这个类的实现,把它理解成java里的线程池一样的性质就行
    private fun createScheduler() =
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    // 协程的调度方法,最后还是交给了线程池去调度,所谓调度,看来可以理解成线程切换
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        coroutineScheduler.dispatch(block, tailDispatch = true)

    internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        coroutineScheduler.dispatch(block, context, tailDispatch)
    }

    override fun close() {
        coroutineScheduler.close()
    }
    ...略
}

从SchedulerCoroutineDispatcher的源码可以看出,这个类维护了一个线程池,用来执行协程线程的切换。再看看它的父类ExecutorCoroutineDispatcher:

public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
    /** @suppress */
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<CoroutineDispatcher, ExecutorCoroutineDispatcher>(
        CoroutineDispatcher,
        { it asExecutorCoroutineDispatcher })

    /**
     * Underlying executor of current [CoroutineDispatcher].
     */
    public abstract val executor: Executor

    /**
     * Closes this coroutine dispatcher and shuts down its executor.
     *
     * It may throw an exception if this dispatcher is global and cannot be closed.
     */
    public abstract override fun close()
}

是个抽象类,只是定义了协程调度器的规范,继续看它的父类CoroutineDispatcher:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

    /** @suppress */
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
        ContinuationInterceptor,
        { it asCoroutineDispatcher })

    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    // 调度方法是在这里定义的
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)

    @InternalCoroutinesApi
    public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)

    // 核心方法interceptContinuation,拦截续体,包装成了成了DispatchedContinuation,DispatchedContinuation也是一个很重要的类
    // 协程的拦截器实现
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

    ...略
}

// 续体拦截器接口
public interface ContinuationInterceptor : CoroutineContext.Element {
    /**
     * The key that defines *the* context interceptor.
     */
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>

    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...略
}

通过以上源码分析,Dispatchers.Default也就是协程调度器CoroutineDispatcher主要有两个核心方法,
dispatch(context: CoroutineContext, block: Runnable):用来切换线程
interceptContinuation(continuation: Continuation<T>): Continuation<T>:用来拦截续体Continuation,将其包装成DispatchedContinuation
记住这上面两个方法,后面的流程分析会多次用到。

续体Continuation

在上面的代码分析中频繁出现了一个概念-续体Continuation,这个概念非常重要,协程启动,挂起,恢复都有它有关,看看它的定义,是一个接口:

public interface Continuation<in T> {
    /**
     * 持有协程上下文
     */
    public val context: CoroutineContext

    /**
     *  核心方法,恢复、开启协程体的执行入口方法
     *
     */
    public fun resumeWith(result: Result<T>)
}

要理解续体是怎么工作的,我们继续跟着launch函数走:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, // 上下文
    start: CoroutineStart = CoroutineStart.DEFAULT, // 启动策略
    block: suspend CoroutineScope.() -> Unit // 协程代码块
): Job {
    // 第一步,先把传进来的上下文组装成一个新的
    val newContext = newCoroutineContext(context)
    // 第二步,构建一个Coroutine,分析的时候我们使用DEFAULT策略,所以这里创建的是StandaloneCoroutine,
    // 它实际是个实现了续体Continuation和CoroutineScope和Job的子类,具体后面分析
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    // 第三步,启动协程
    coroutine.start(start, coroutine, block)
   // 第四步, 作为Job返回这个实例,job可以用来控制协程的一些运行状态,可以看做是协程体的引用
    return coroutine
}

第三步调用了StandaloneCoroutine的start方法:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

接着调用了CoroutineStart的invoke方法(kotlin里invoke方法可以通过对象来调用,这里官方实在很恶心,到处埋雷):

public enum class CoroutineStart {
// 到这里理一下对应关系,入参receiver和completion都是launch方法里StandaloneCoroutine实例coroutine,R泛型这里是协程作用域CoroutineScope
// StandaloneCoroutine也是实现了CoroutineScope接口的,completion则是续体Continuation,StandaloneCoroutine也实现了Continuation
// block是我们业务代码传入的协程代码块
   public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            // 我们使用的DEFAULT策略,执行这里启动
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }
    ... 略
}
// StandaloneCoroutine,其父类实现了Coroutine和CoroutineScope
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}
//  重点看父类AbstractCoroutine,注意它的构造参数parentContext,传入了父协程上下文
public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    /**
     * 合并了上下文元素
     */
    @Suppress("LeakingThis")
    public final override val context: CoroutineContext = parentContext + this

    /**
     * 关注一下这个方法,续体里定义的抽象方法
     */
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

    protected open fun afterResume(state: Any?): Unit = afterCompletion(state)

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }
    ... 略
}

继续跟踪block.startCoroutineCancellable(receiver, completion),block是我们启动协程传入的lambda,startCoroutineCancellable是lambda函数的扩展函数:

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

继续看方法体中第一个方法createCoroutineUnintercepted,这个方法也是suspend () -> T 函数的扩展函数,它是这样的:

 public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(  
     completion: Continuation<T>  
 ): Continuation<Unit> {  
     // 这里对completion进行了一层封装 
     val probeCompletion = probeCoroutineCreated(completion)  
    // 判断函数suspend () -> T 实例是否属于BaseContinuationImpl的子类
     return if (this is BaseContinuationImpl)  
        // 是的话就调用其create方法
        // 这里涉及到kotlin编译的问题,用kotlin看源码往往很蛋疼,这时候就需要把之前的那个协程体反编译成java来看看
         create(probeCompletion)  
     else  
         createCoroutineFromSuspendFunction(probeCompletion) {  
             (this as Function1<Continuation<T>, Any?>).invoke(it)  
         }  
 }


// MainActivity里协程启动的方法反编译成java代码
BuildersKt.launch$default((CoroutineScope)((CoroutineScope)GlobalScope.INSTANCE)
, (CoroutineContext)((CoroutineContext)Dispatchers.getDefault())
, null
// 这是我们写的lambda代码块,被包装成了Function2<CoroutineScope, Continuation<super Unit>, Object>实例,然而
// 其实在接下来还会继续被包装成SuspendLambda类对象
, (Function2)((Function2)new Function2<CoroutineScope, Continuation<super Unit>, Object>(null){
            // 状态标志位
            int label;
            
            @Nullable
            // 我们写的lambda表达式被放在了invokeSuspend方法里执行
            public final Object invokeSuspend(@NotNull Object object) {
                IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch (this.label) {
                    case 0: {
                        ResultKt.throwOnFailure((Object)object);
                        // 我们在协程里就做了一件事执行println
                        System.out.println((Object)"launch test");
                        return Unit.INSTANCE;
                    }
                }
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            // 注意这个函数,就是上面createCoroutineUnintercepted函数里调用到的地方
            @NotNull
            public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> $completion) {
                return (Continuation)new /* invalid duplicate definition of identical inner class */;
            }

            @Nullable
            public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<super Unit> p2) {
                return (this.create((Object)p1, p2)).invokeSuspend((Object)Unit.INSTANCE);
            }
        }), (int)2, null);

继续跟踪SuspendLambda类及其父类链:

// 构造函数的入参completion,也是个续体,注意后面的分析,这个续体是谁
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)

    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}

// ContinuationImpl抽象类
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    // _context协程上下文用的入参completion的上下文
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>= null

    // 重点关注这个方法,这个方法先取出了completion上下文中的续体拦截器,然后调用了interceptContinuation方法
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}

// BaseContinuationImpl抽象类,注意看实现了Continuation接口,说明我们写的协程方法体也是一个续体Continuation
internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    // Continuation的核心方法
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 核心代码,调用了invokeSuspend函数,前面分析过这个函数里放的就是我们写的协程方法体block!!!
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    // 在上面的反编译的java有对该方法的实现
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }

    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    // 在上面的反编译的java有对该方法的实现
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }

    public override fun toString(): String =
        "Continuation at ${getStackTraceElement() ?: this::class.java.name}"

    // --- CoroutineStackFrame implementation

    public override val callerFrame: CoroutineStackFrame?
        get() = completion asCoroutineStackFrame

    public override fun getStackTraceElement(): StackTraceElement=
        getStackTraceElementImpl()
}

做个小总结:协程启动函数里的协程体lambda代码块实际编译的时候会被封装成SuspendLambda对象,而SuspendLambda的继承链路是这样的:
SuspendLambda->ContinuationImpl->BaseContinuationImpl->Continuation
说明协程体lambda代码块也是个续体Continuation,并且在协程启动方法里会调用自身实现的create(value: Any?, completion: Continuation<>)方法创建实例

继续回到协程启动流程

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
    // 不难发现,协程的启动就是由续体串起来的,记住每个调用的时候是哪个续体实例,就能记住协程的启动流程
    createCoroutineUnintercepted(completion) // 第一步,将block转为SuspendLambda实例,此实例持有了completion,也就持有了创建协程时传入的上下文,包括拦截器
        .intercepted()  // 第二步,调用了intercepted()方法,这个方法是个Continuation扩展函数
        .resumeCancellableWith(Result.success(Unit)) // 第三步,这个函数依然是Continuation扩展函数
}

// 第二步的扩展函数intercepted()
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    // 调用它的对象正是SuspendLambda,它就是ContinuationImpl的子类,而从上面的ContinuationImpl源码我们可以
    //看到,ContinuationImpl的intercepted()方法实际是调用了其持有的completion上下文中的续体拦截器,然后调用了interceptContinuation方法。
    // 再往上面看这个completion正是协程启动launch方法里创建的StandaloneCoroutine,它里面的上下文属于拦截器性质的就是我们之前研究的Dispatchers.DEFAULT。
    // 由此深挖到最终执行了CoroutineDispatcher类里的interceptContinuation(continuation: Continuation<T>): Continuation<T>方法,它返回了一个
    // 经过包装后的DispatchedContinuation实例,它也是个续体Continuation,后面会继续分析
    (this asContinuationImpl)?.intercepted() ?: this

// 第三步的扩展函数resumeCancellableWith()
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)= null
): Unit = when (this) {
    // 经过第二步分析我们知道最终我们得到的就是一个DispatchedContinuation对象,所以执行了resumeCancellableWith(result, onCancellation)方法
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

接下来看看DispatchedContinuation类,重点看resumeCancellableWith方法:

// 注意看它的构造方法参数,一个CoroutineDispatcher和Continuation,从CoroutineDispatcher类里interceptContinuation(continuation: Continuation<T>)方法
// 可以看到,构建DispatchedContinuation传入的dispatcher就是自己(this),而传入的continuation就是由我们写的协程lambda表达式封装成的SuspendLambda。
internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  // delegate是自己,其实就是代理的continuation在干活
    override val delegate: Continuation<T>
            get() = this
      inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // 关键步骤,在这里执行了调度方法dispatch,前面我们分析过dispatcher的dispatch方法就是往线程池里调度了一个runnable,真正
          // 执行的地方是run方法,而这个runnable传的是this,说明DispatchedContinuation是实现了Runnable接口的,而实现的地方其实是在其父类DispatchedTask
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    ...略
}

// 父类DispatchedTask,父类SchedulerTask继续往上找继承关系就能找到Runnable接口,这里不深究线程池,会另外开篇讲
internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    internal abstract val delegate: Continuation<T>

    // 关键方法run
    public final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable= null
        try {
            val delegate = delegate as DispatchedContinuation<T>

            // 结合前面的代码,这个continuation是SuspendLambda对象
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                val exception = getExceptionalResult(state)
                /*
                 * Check whether continuation was originally resumed with an exception.
                 * If so, it dominates cancellation, otherwise the original exception
                 * will be silently lost.
                 */
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    // 异常处理
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        // 正常流程走到这里,调用resume方法,resume也是个扩展函数,真正执行的是continuation的resumeWith方法
                        // 前面分析过SuspendLambda的resumeWith方法是其父类BaseContinuationImpl实现的,里面执行了invokeSuspend()方法,
                        // 至此,我们定义的协程体方法终于得到了执行。
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }
  ...略
 }
    // resume也是个扩展函数
    public inline fun <T> Continuation<T>.resume(value: T): Unit =
        resumeWith(Result.success(value))

至此,在各个续体饶了一大圈终于梳理完了协程是怎么工作的,此篇文章主要是理清楚协程的一些基本概念和运转流程,还没涉及到挂起和恢复和协程线程池的管理,下篇将会对协程的挂起和恢复进行剖析。
附:流程图


庖丁解牛,一文搞懂Kotlin协程的运行原理,第1张
dda0721790fe4b419586745f746b77a8.png

https://www.xamrdz.com/backend/3y61938416.html

相关文章: