当前位置: 首页>后端>正文

深究Kotlin协程delay函数源码实现

鍓嶈█

鍦ㄥ紑鍙戦」鐩湡闂?Kotlin 鍗忕▼鏄粡甯镐娇鐢ㄧ殑寮傛&骞跺彂缂栫▼妗嗘灦銆傚湪鍗忕▼浣跨敤杩囩▼涓紝鏃跺父浼氱敤鍒版寕璧峰嚱鏁帮紝鑰?delay 灏辨槸涓€涓寕璧峰嚱鏁帮紝鍦ㄥ緢澶氫笟鍔″満鏅腑浼氫娇鐢ㄥ埌锛屾湰鏂囬€氳繃婧愮爜鍒嗘瀽浜嗚В鍏惰儗鍚庣殑瀹炵幇鍘熺悊

鍒嗘瀽

涓句釜馃尠
閫氬父鎴戜滑鐨勪笟鍔″満鏅湁杩欑鎯呭喌锛岄渶瑕佸欢鏃舵墽琛屾煇浜涗换鍔?/p>

private suspend fun test() {
    findViewLifecycleOwner()?.lifecycleScope?.launch() {
        print("before delay")
        //寤舵椂3s
        delay(3000)
        print("after delay")
    }
}

delay 鍑芥暟鍒欎細鎸傝捣褰撳墠鍗忕▼锛屽苟涓斾細鍦?s鍚庤繘琛屾仮澶?br> 鍐嶆潵鐪嬬湅 delay 鍑芥暟鐨勫疄鐜?/p>

/**
 * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
 *
 */
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like  awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

suspendCancellableCoroutine

鎴戜滑鍏堟潵鐪嬬湅 suspendCancellableCoroutine 鍑芥暟锛屽畠鏄?Kotlin 鍗忕▼搴撲腑鐨勪竴涓嚱鏁帮紝瀹冩槸涓€涓寕璧峰嚱鏁帮紝鐢ㄤ簬鍒涘缓涓€涓彲浠ヨ鍙栨秷鐨勬寕璧风偣銆?br> 杩欎釜鍑芥暟涔熸槸鍗忕▼涓粡甯镐娇鐢紝瀹冨彲浠ュ皢寮傛鐨勫洖璋冪敤鍚屾鐨勬柟寮忚〃杈惧嚭鏉ワ紝鍑忓皯鍥炶皟宓屽

public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        /*
         * For non-atomic cancellation we setup parent-child relationship immediately
         * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
         * properly supports cancellation.
         */
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

鐒惰€?suspendCancellableCoroutine 鍑芥暟鍐呴儴鏄娇鐢ㄤ簡suspendCoroutineUninterceptedOrReturn 鍑芥暟瀹炵幇鐨勶紝鑰屼笖涔熸槸涓€涓寕璧峰嚱鏁?/p>

public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

suspendCoroutineUninterceptedOrReturn 鍑芥暟娌℃湁瀹為檯鍐呭锛屾槸鍥犱负瀹冩槸缂栬瘧鍣ㄥ唴寤哄嚱鏁帮紝瀹冩槸鐢?Kotlin 缂栬瘧鍣ㄦ潵瀹炵幇鐨勩€備富瑕佷綔鐢ㄦ槸浼犻€掑崗绋嬩笂涓嬫枃锛屼互鍙婂垽鏂槸鍚︽寕璧风殑锛屾垨鑰呯洿鎺ヨ繑鍥炵粨鏋?br> 鍐嶄妇涓煂?br> 鍐欎釜馃憞鏂规硶锛岀劧鍚庤繘琛屽弽缂栬瘧鐪嬬湅

private suspend fun test2(){
    suspendCancellableCoroutine<String> {
        print("suspendCancellableCoroutine test")
    }
}

鍙嶇紪璇戝悗

// 浼犻€掍簡 Continuation 涓婁笅鏂?
private final Object test2(Continuation $completion) {
   int $i$f$suspendCancellableCoroutine = false;
   int var4 = false;
   //灏佽瀹炰綋
   CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
   cancellable$iv.initCancellability();
   CancellableContinuation it = (CancellableContinuation)cancellable$iv;
   int var7 = false;
   //鎵цblock浠g爜
   String var8 = "suspendCancellableCoroutine test";
   System.out.print(var8);
   //鑾峰彇鍗忕▼浣撹繑鍥炲€?
   Object var10000 = cancellable$iv.getResult();
   if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
      DebugProbesKt.probeCoroutineSuspended($completion);
   }
   //鍒ゆ柇鏄惁鏄寕璧凤紝鏄殑璇濊繑鍥炴寕璧风姸鎬?
   return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() var10000 : Unit.INSTANCE;
}

姝e涓婇潰鎵€璇寸殑锛宻uspendCoroutineUninterceptedOrReturn 鍑芥暟鏄?strong>鏂板浜嗕紶閫掑崗绋嬩笂涓嬫枃锛屼互鍙婂垽鏂槸鍚︽寕璧风殑锛屾垨鑰呯洿鎺ヨ繑鍥炵粨鏋滅殑閫昏緫
鐭ラ亾浜?suspendCancellableCoroutine 鍑芥暟鐨勪綔鐢ㄥ悗锛屽啀鍥炶繃澶寸湅 delay 鍑芥暟

/**
 * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
 *
 */
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        // if timeMillis == Long.MAX_VALUE then just wait forever like  awaitCancellation, don't schedule.
        if (timeMillis < Long.MAX_VALUE) {
            cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
        }
    }
}

suspendCancellableCoroutine 鍑芥暟浼犻€掍簡灏佽鍚庣殑鍗忕▼瀵硅薄 cont锛屼互鍙婂垽鏂?cont 鏄惁浼氭墽琛屾寕璧?/p>

DefaultExecutor

鐪嬬湅 cont.context.delay鏄暐

//鑾峰彇delay
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) asDelay ?: DefaultDelay

//榛樿瀹炵幇
internal actual val DefaultDelay: Delay = DefaultExecutor

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {

 init {
    incrementUseCount() // this event loop is never completed
 }
    ... 鐪佺暐寰堝浠g爜 ...
}

鍙互鐪嬭 cont.context.delay 鏈€缁堢殑瀹炵幇鏄?DefaultExecutor 锛屽畠缁ф壙浜咵ventLoopImplBase 鍜?Runnable
DefaultExecutor 鏄釜鍗曚緥锛岄噷杈瑰紑鍚簡绾跨▼锛屽苟涓旀娴嬮槦鍒楅噷浠诲姟鐨勬儏鍐垫潵鍐冲畾鏄惁闇€瑕佹寕璧风嚎绋嬭繘琛岀瓑寰?br> 鑰?scheduleResumeAfterDelay 鍑芥暟鏄?EventLoopImplBase 閲屽疄鐜扮殑

public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val timeNanos = delayToNanos(timeMillis)
    if (timeNanos < MAX_DELAY_NS) {
        val now = nanoTime()
        DelayedResumeTask(now + timeNanos, continuation).also { task ->
            continuation.disposeOnCancellation(task)
            schedule(now, task)
        }
    }
}

濡傛灉婊¤冻鏃堕棿鏉′欢锛屽垯鍒涘缓涓€涓欢杩熺殑task锛孌elayedResumeTask

鍏ラ槦鍒?/h3>

鐪?schedule 鍑芥暟

public fun schedule(now: Long, delayedTask: DelayedTask) {
    when (scheduleImpl(now, delayedTask)) {
        //unpark() 浼氬惎鍔ㄧ嚎绋?
        SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
        SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
        SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
        else -> error("unexpected result")
    }
}

private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
    if (isCompleted) return SCHEDULE_COMPLETED
    val delayedQueue = _delayed.value ?: run {
        _delayed.compareAndSet(null, DelayedTaskQueue(now))
        _delayed.value!!
    }
    //鍏ラ槦鍒?
    return delayedTask.scheduleTask(now, delayedQueue, this)
}

瀹為檯灏辨槸鎶?DelayedResumeTask 鏀捐繘 _delayed 闃熷垪閲岄潰锛屽苟涓斿惎鍔?DefaultExecutor 閲岀殑绾跨▼锛堝鏋滄病鏈夊紑鍚殑璇濓級
_delayed 鏄瓨鍌ㄥ欢杩熸墽琛?task 鐨勯槦鍒?br> 杩樻湁涓€涓?_queue 鏄槸瀛樺偍姝e父浠诲姟鐨勯槦鍒?/p>

internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
    // null | CLOSED_EMPTY | task | Queue<Runnable>
    private val _queue = atomic<Any?>(null)

    // Allocated only only once
    private val _delayed = atomic<DelayedTaskQueue?>(null)
    ...
    }

鏃㈢劧涓婇潰鏈夊叆闃熷垪锛屽氨鏈夊嚭闃熷垪

鍑洪槦鍒?/h3>

鍥炲埌鍒氬垰璇寸殑鍚姩浜?DefaultExecutor 閲岀殑绾跨▼锛岀湅鐪嬪畠鐨?run 鏂规硶

override fun run() {
    ThreadLocalEventLoop.setEventLoop(this)
    registerTimeLoopThread()
    try {
        var shutdownNanos = Long.MAX_VALUE
        if (!notifyStartup()) return
        while (true) {
            Thread.interrupted() // just reset interruption flag
            //鍙栭槦鍒楁牳蹇冧唬鐮?
            var parkNanos = processNextEvent()
            if (parkNanos == Long.MAX_VALUE) {
                // nothing to do, initialize shutdown timeout
                val now = nanoTime()
                if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
                val tillShutdown = shutdownNanos - now
                if (tillShutdown <= 0) return // shut thread down
                parkNanos = parkNanos.coerceAtMost(tillShutdown)
            } else
                shutdownNanos = Long.MAX_VALUE
                //濡傛灉杩斿洖鐨勬椂闂村ぇ浜?
            if (parkNanos > 0) {
                // check if shutdown was requested and bail out in this case
                if (isShutdownRequested) return
                //鎸傝捣绾跨▼涓€娈垫椂闂?
                parkNanos(this, parkNanos)
            }
        }
    } finally {
        _thread = null // this thread is dead
        acknowledgeShutdownIfNeeded()
        unregisterTimeLoopThread()
        // recheck if queues are empty after _thread reference was set to null (!!!)
        if (!isEmpty) thread // recreate thread if it is needed
    }
}

鍙栧嚭闃熷垪閲岀殑 task 閫昏緫鏄湪 processNextEvent() 鍑芥暟涓紝鐪嬬湅瀹冪殑瀹炵幇

override fun processNextEvent(): Long {
    // unconfined events take priority
    if (processUnconfinedEvent()) return 0
    // queue all delayed tasks that are due to be executed
    val delayed = _delayed.value
    if (delayed != null && !delayed.isEmpty) {
        val now = nanoTime()
        while (true) {
            // make sure that moving from delayed to queue removes from delayed only after it is added to queue
            // to make sure that 'isEmpty' and `nextTime` that check both of them
            // do not transiently report that both delayed and queue are empty during move
            delayed.removeFirstIf {
                if (it.timeToExecute(now)) {
                // 鍔犲叆姝e父浠诲姟闃熷垪
                    enqueueImpl(it)
                } else
                    false
            } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
        }
    }
    // then process one event from queue
    val task = dequeue()
    if (task != null) {
        task.run()
        return 0
    }
    //杩斿洖绾跨▼闇€瑕佹寕璧风殑鏃堕棿
    return nextTime
}

protected override val nextTime: Long
    get() {
        if (super.nextTime == 0L) return 0L
        val queue = _queue.value
        when {
            queue === null -> {} // empty queue -- proceed
            queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
            queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
            else -> return 0 // non-empty queue
        }
        val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
        return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)

鍏朵腑娴佺▼鏄細

  1. 鏈変釜姝诲惊鐜竴鐩村湪浠?_delayed 闃熷垪閲屽彇寤惰繜 task锛屽鏋滃垽鏂欢杩熸椂闂村凡缁忓埌浜嗘墠浼氬姞鍏ユ甯镐换鍔¢槦鍒楅噷涓旂Щ闄?/li>
  2. 鐩村埌鍙栦笉鍑哄欢杩?task 浜嗘墠璺冲嚭寰幆
  3. 鐒跺悗浠庢甯搁槦鍒楅噷鍙栧嚭浠诲姟杩涜鎵ц
  4. 鎵ц浠诲姟灏辨槸鍦ㄦ墽琛?DelayedResumeTask 绫婚噷鐨?run 鏂规硶
   private inner class DelayedResumeTask(
    nanoTime: Long,
    private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
    override fun run() { with(cont) { resumeUndispatched(Unit) } }
    override fun toString(): String = super.toString() + cont.toString()
}
  1. 鍙互鏄湅瑙佽皟鐢ㄤ簡 resumeUndispatched() 鍑芥暟锛屼娇鐢ㄥ崗绋嬭兘缁忓父鐪嬭鎴栬€呬娇鐢?resumexxx 鍑芥暟锛屽畠灏辨槸鍗忕▼涓殑鎭㈠锛屽搴斾簬鎸傝捣锛屽畠浠悊搴旀槸鎴愬鍑虹幇鐨?/li>
  2. resumeUndispatched() 鍑芥暟鏈€缁堢殑瀹炵幇灏辨槸涓€寮€濮嬪皝瑁?Continuation 鍗忕▼涓?CancellableContinuationImpl 鐨勫疄鐜帮紝鍗虫槸 cont 瀵硅薄锛屾渶缁堟仮澶嶄簡鍗忕▼鐨勮繍琛?/li>
//鎭㈠鍗忕▼
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
    val dc = delegate asDispatchedContinuation
    resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}

processNextEvent() 鍑芥暟鐨勮繑鍥炲€?/h3>
  • 鏈€鍚庤繑鍥炰笅涓€涓?nextDelayedTask 鐨勫欢杩熸椂闂达紝鍗虫槸绾跨▼鎸傝捣鏃堕棿
  • 濡傛灉杩斿洖鐨勬椂闂村ぇ浜?锛屽垯鏈€缁堜細璋冪敤 LockSupport.parkNanos()锛屽皢绾跨▼鎸傝捣涓€娈垫椂闂达紝鐩村埌寤惰繜鏃堕棿缁撴潫銆?br> 婧愮爜鐪嬪埌杩欓噷灏辫兘鐭ラ亾 delay 鍑芥暟鐨勮儗鍚庤繍琛屽師鐞嗕簡锛屾渶缁堟槸鐢?DefaultExecutor 绾跨▼鏁翠綋鎺у埗鎸傝捣鍜屾仮澶嶏紝鑰屼笖涓嶄細闃诲褰撳墠璋冪敤鏂圭殑绾跨▼銆?/li>

鎬荤粨

delay 鍑芥暟鐨勬€讳綋娴佺▼鏄?/p>

  1. 鍒涘缓涓€涓欢杩熶换鍔★紙DelayedResumeTask锛夛紝瀹冨寘鍚渶瑕佸欢杩熸墽琛岀殑閫昏緫鍜屾墽琛屾椂闂翠俊鎭€?/li>
  2. 灏嗗欢杩熶换鍔℃坊鍔犲埌寤惰繜闃熷垪锛坃delayed锛変腑銆?/li>
  3. 鍦ㄧ瓑寰呭欢杩熶换鍔℃墽琛屼箣鍓嶏紝鍗忕▼浼氳鏆傚仠锛堟寕璧凤級銆?/li>
  4. 鏈変竴涓崟鐙殑 DefaultExecutor 绾跨▼瀹氭湡妫€鏌ュ欢杩熼槦鍒椾腑鐨勪换鍔★紝濡傛灉浠诲姟鐨勫欢杩熸椂闂村埌浜嗭紝灏变細灏嗕换鍔′粠寤惰繜闃熷垪涓Щ闄わ紝骞跺皢鍏舵斁鍏ユ墽琛岄槦鍒椾腑銆?/li>
  5. 鎵ц闃熷垪涓彇鍑轰换鍔★紝骞惰皟鐢ㄥ叾鎵ц閫昏緫銆?/li>
  6. 鎵ц浠诲姟鐨勮繃绋嬩篃灏辨槸鍗忕▼鐨勬仮澶嶈繃绋嬶紝涓€鏃︿换鍔″紑濮嬫墽琛岋紝鍗忕▼浼氳鎭㈠锛岀户缁墽琛屽叾鍚庣画鐨勯€昏緫銆?/li>

https://www.xamrdz.com/backend/3yu1938360.html

相关文章: