当前位置: 首页>编程语言>正文

庖丁解牛,一文搞懂Kotlin协程的常用方法

本篇文章举例协程的各种方法的使用,并简单阐述各个方法的一些注意事项。
协程作用域的创建
1.通过工厂函数创建自定义上下文的作用域
// 举个例子:
// 协程异常处理器(也是一个上下文元素)
private val exceptionHandler =
        CoroutineExceptionHandler { _, throwable ->
            onError(throwable)
        }
val myScope = CoroutineScope(exceptionHandler + SupervisorJob() + Dispatchers.IO)

// 源码
public fun CoroutineScope(
    context: CoroutineContext
): CoroutineScope = ContextScope(
    if (context[Job] != null) context
    else context + Job()
)
    
internal class ContextScope(
    context: CoroutineContext
) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    
    override fun toString(): String =
        "CoroutineScope(coroutineContext=$coroutineContext)"
}

2.通过工厂函数MainScope()创建主线程调度器的作用域
// 举个例子:
val mainScope = MainScope()

// 源码
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
3.官方架构提供的一些作用域viewModelScope和lifecycleScope
//  Android JetPack ViewModel里的viewModelScope,ViewModeld的扩展变量
// 来自androidx.lifecycle:lifecycle-viewmodel-ktx:2.3.1
public val ViewModel.viewModelScope: CoroutineScope
    get() {
        val scope: CoroutineScope= this.getTag(JOB_KEY)
        
        if (scope != null) {
            return scope
        }
        
        return setTagIfAbsent(
            JOB_KEY,
            CloseableCoroutineScope(
                SupervisorJob() +
                Dispatchers.Main.immediate
            )
        )
}

internal class CloseableCoroutineScope(
    context: CoroutineContext
) : Closeable, CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    
    override fun close() {
        coroutineContext.cancel()
    }
}

//  Android JetPack lifecycle里的lifecycleScope,LifecycleOwner(Activity实现了此接口,可以直接用)的扩展变量
// 来自androidx.lifecycle:lifecycle-runtime-ktx:2.2.0

val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope

val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }
协程的创建
方式一:launch

创建一个协程,并返回协程的引用job,最常用的方式

val job = MainScope().launch (Dispatchers.IO){
                delay(1000)
                println("run in coroutine")
            }

job.cancel()  // 取消协程
job.join()  // 等待协程执行完毕
job.cancelAndJoin() // 取消并等待协程执行完毕(取消操作并不等于会立马响应,这里其实是分别执行了cancel和join)
job.cancelChildren() // 取消子协程
方式二:async

创建一个协程,并返回协程的引用Deferred(也是个job),通过Deferred的await拿到返回结果,此时协程会挂起

MainScope().launch (Dispatchers.Main){
              // 启动协程
                val deferred = async {
                    "async result"
                }
                // 挂起等待协程结果
                val result = deferred.await()
                // 恢复挂起
                println("result = $result")
   }
作用域构建器
runBlocking

runBlocking {} 会在开启一个新的协程,并且阻塞主线程,直到操作完成。这个函数不应该在协程里面使用,它是用来桥接需要阻塞的挂起函数,主要用于 main function 和 junit 测试。注意他只是一个普通函数。

coroutineScope
        runBlocking {
            val result = coroutineScope {
                println("run thread = ${Thread.currentThread().name}")
                launch {
                    delay(1000)
                    println("run launch success1")
                }
                launch {
                    delay(2000)
                    println("run launch success2")
                }
                "coroutineScope result"
            }
            println("result = $result")
        }
// 运行结果(看得出来使用的是父协程的线程调度器)
run thread = main   
run launch success1
run launch success2
result = coroutineScope result

// 注意这是一个挂起函数,它会挂起执行一直到里面的子协程执行完毕、代码执行完毕然后返回结果,这是他与runBlocking不同的地方,runBlocking只是个普通函数,里面的协程体会阻塞线程。
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
}

coroutineScope经常用来把一个长耗时的任务拆分成多个子任务,使这些子任务并行执行,这就是所谓的结构化并发:

suspend fun showSomeData() = coroutineScope {
    val data1 = async {         //子任务1
        delay(2000)
        100
    }
    val data2 = async {         //子任务2
        delay(3000)
        20
    }
    
    withContext(Dispatchers.Default) {      //合并结果并返回
        delay(3000)
        val random = Random(10)
        data1.await() + data2.await() + random.nextInt(100)
    }
}
supervisorScope

supervisorScope功能与coroutinScope类似,不同之处在于,它可以让里面的子协程发生异常而崩溃的时候不影响自身和其他的子协程的执行。

        runBlocking {
            val result = supervisorScope {
                println("run thread = ${Thread.currentThread().name}")
                launch {
                    delay(1000)
                  // 主动抛异常
                    throw RuntimeException()
                    println("run launch success1")
                }
                launch {
                    delay(2000)
                    println("run launch success2")
                }
                "coroutineScope result"
            }
            println("result = $result")
        }
// 运行结果
run thread = main
run launch success2
result = coroutineScope result

说到supervisorScope顺便提一嘴SupervisorJob,创建作用域或者启动协程的时候如果上下文带上SupervisorJob()也可以达到自身协程处理异常而不影响父协程和兄弟协程的运行。

常用函数
withTimeout
suspend fun timeOut(){
    try {
        withTimeout(1000){
            println("在此做一些耗时操作,超过设定的时间就抛异常")
            delay(2500)
        }
    }catch (e:TimeoutCancellationException){
        println(e.message)
        // 处理超时异常
    }
}
withContext
// 挂起函数,适合一些耗时异步操作
suspend fun timeOut(){
   val result = withContext(Dispatchers.IO){
        delay(2500)
        "result"
    }
println(result)
}
flow

private fun flowEmit() = flow<Int> {
        // 这段代码块实际运行在 flowEmit().collect所处的协程代码块里
        println("...$coroutineContext")
        for (k in 1 .. 3){
            delay(1000)
            emit(k)
        }
    }
    @Test
    fun flowTest() = runBlocking {
        println("...${this.coroutineContext}")
        //  flowEmit()只是创建了个对象
        // 调用collect的时候才开始执行上面定义的代码块
        flowEmit().collect(object : FlowCollector<Int> {
            // 这个代码其实也是运行在runBlocking协程域里
            override suspend fun emit(value: Int) {
                println("...$value")
            }
        })
        //
        println("...end")
    }

输出
...[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2bdd8394, BlockingEventLoop@5f9edf14]
...[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2bdd8394, BlockingEventLoop@5f9edf14]
...1
...2
...3
...end

https://www.xamrdz.com/lan/5ub1994819.html

相关文章: