前言
不知道大家有没有跟我一样的感受:即使自己用心在网上学过协程和Flow了,但过了一段时间就又忘掉了。这大部分的原因其实是因为我们缺少实战。我平时工作里根本就接触不到协程和Flow,自己又不敢硬往上写,万一出问题了咋整?所以一直就处于理论学习阶段,导致我学了就跟没学一样。
今天就带大家一起来解决这个问题,通过几个简单的Demo和实战,巩固我们Kotlin协程和Flow的知识体系,从而能更有信心地用到实际项目中去。
协程战前动员
协程实战之前,有几点需要注意的细节需要提前说一下,这些也是平时开发中比较容易遇到的。
协程的取消
调用cancel
方法并不保证能取消协程,取消协程的前提是代码块在执行过程中对协程的状态进行了校验。常见的挂起函数如withContext
、delay
、yield
都有做校验。但如果是普通的函数,即使cancel
也是会继续执行的。若要普通函数也能即时取消,需要在内部对CoroutineScope
的isActive
属性进行check。
val cancelJob = scope.launch {
for(i in 0..5) {
// 代码1
if (isActive) {
Log.d("11", "avc")
}
}
}
cancelJob.cancel()
如上在代码1处,如果不加isActive
属性的判断,即使cancel
了,这个日志仍然是会循环打印的。
协程的异常处理
协程代码块中仍然是使用try-catch进行异常捕获。CoroutineExceptionHandler
作为CoroutineContext
的基类,是用于捕获未捕获的异常,类似于Java.Thread.DefaultUncaughtExceptionHandler
。
默认情况下,协程中未捕获的异常是会传递到其同级与父级协程的。若不想有这种传递,可以用SupervisorScope
或者SupervisorJob
启动协程,此时子协程若发生异常不会扩散。但SupervisorJob
改变的只是异常的传递方式,而不具备捕获异常的能力,因此需要用CoroutineExceptionHandler
去捕获异常,否则子协程的异常仍然会造成应用的崩溃。
CoroutineScope(context).launch {
val supervisorJob = SupervisorJob()
with(CoroutineScope(context + supervisorJob)) {
// 即使是SupervisorJob,子协程如果发生异常,仍然会崩溃,需要用try catch或者CoroutineExceptionHandler捕获
val firstChild = launch (CoroutineExceptionHandler{_, exception -> {}} ) {
throw RuntimeException()
}
val secondChild = launch {
firstChild.join()
try {
delay(Long.MAX_VALUE)
} catch (e: Exception) {
print("cancelled because supervisor job is cancelled")
}
}
firstChild.join()
// supervisorJob取消后,所有子协程也会取消
supervisorJob.cancel()
secondChild.join()
}
}
如果没用SupervisorScope
或者SupervisorJob
改变异常的传递方式,那么子协程的异常是会委托给父协程去处理的。所以此时即使在子协程的构造方法中声明了CoroutineExceptionHandler
,也捕获不到异常,这个时候就需要在根协程构造的时候或者Scope初始化的时候去声明CoroutineExceptionHandler
。
CoroutineScope(context + CoroutineExceptionHandler{_, exception -> {}}).launch {
with(CoroutineScope(context {
// 子协程发生异常,传递到父协程,父协程捕获
val firstChild = launch {
throw RuntimeException()
}
val secondChild = launch {
firstChild.join()
try {
delay(Long.MAX_VALUE)
} catch (e: Exception) {
print("cancelled because supervisor job is cancelled")
}
}
firstChild.join()
// supervisorJob取消后,所有子协程也会取消
supervisorJob.cancel()
secondChild.join()
}
}
另外,想要改变哪个协程的异常传递方式,就将SupervisorJob
声明在哪个协程。如果SupervisorJob
声明在父协程,子协程没声明,那么子协程的CoroutineContext
是会覆盖父类的,异常仍会到传递到父类。
CoroutineScope(context + CoroutineExceptionHandler{_, exception -> {}} + SupervisorJob()).launch {
with(CoroutineScope(context {
// 子协程发生异常,传递到父协程,父协程捕获
val firstChild = launch {
throw RuntimeException()
}
val secondChild = launch {
firstChild.join()
try {
delay(Long.MAX_VALUE)
} catch (e: Exception) {
print("cancelled because supervisor job is cancelled")
}
}
firstChild.join()
// supervisorJob取消后,所有子协程也会取消
supervisorJob.cancel()
secondChild.join()
}
}
如上,子协程发生的异常,仍然会传递到父协程,父协程的SupervisorJob
并不会传递给子线程。
协程的资源同步
传统的synchronized
关键字或者Kotlin提供的Mutex互斥锁可以解决资源同步的问题。
CoroutineScope(context).launch {
var sum = 0
launch {
synchronized(this@launch) {
sum++
}
}
launch {
synchronized(this@launch) {
sum++
}
}
}
synchronized
的lock要用外层的对象,而不是this
,否则lock就不是互斥的了。而Mutex用法上也很简单,性能上也差不了多少。
CoroutineScope(context).launch {
var sum = 0
val mutex = Mutex()
launch {
mutex.lock()
sum++
mutex.unlock()
}
launch {
mutex.lock()
sum++
mutex.unlock()
}
}
协程实战
下面就是协程的实战。理论上,因为协程是一个异步框架,所以哪里需要开线程,哪里就能用协程!而并发或串行依赖任务更需要协程!但是如果我们要将协程运用到实际项目中,我们不可能非常随意地先用线程的地方直接就替换为协程,那么哪些地方可以适合让我们“练练手”呢?这里介绍四处:LifecycleOwner、ViewModel、数据层、LiveData。
LifecycleOwner使用协程
在Activity
或者Fragment
中,可以使用lifecycleScope.launch
启动协程,这种方式启动的协程会随着LifecycleOwner
的销毁而销毁,避免内存泄漏。但如果架构符合UI与数据分离的话,一般很少会在UI层使用到协程(除非使用Flow,后面会看到)。
class UserActivity {
lifecycleScope.launch {
}
lifecycle.coroutineScope.launch {
}
}
这两种启动协程的写法是一样的。另外还有launchWhenXXX
方法可以让我们指定在某个生命周期启动协程:
class UserActivity {
lifecycleScope.launchWhenResumed {
}
}
launchWhenResumed
就是在onResume
时启动协程,不在onResume
生命周期后就会将协程挂起。比如APP切换后台再切回来,协程就会先挂起再恢复。
ViewModel使用协程
viewModelScope.launch
启动的协程生命周期跟随ViewModel。一般在调用下层Repository接口时需要启动一个协程,从而能调用Repository层的挂起函数。
class UserViewModel {
fun getUser(userName: String) {
viewModelScope.launch(Dispatchers.Default) {
userRepository.getUser(userName)
}
}
}
数据层使用协程
数据层用withContext
切换到IO或Default线程池,进行网络数据的请求或者内存、持久层的数据读写。需要开多个协程并行执行任务时,可以LiveData监听结果,ViewModel持有这个LiveData,UI层监听。如果需要多个有依赖关系的协程串行执行,就用async+await方法。
class UserRepository {
private val _userMutableLiveData = MutableLiveData<User>()
val userLiveData: LiveData<User>
get() = _userMutableLiveData
suspend fun getUser(username: String) =
withContext(Dispatchers.Default) {
launch(SupervisorJob()) {
val response = userService.getUser(username)
if (response.isSuccessful && response.body() != null) {
_userMutableLiveData.value = response.body()
}
}
launch(SupervisorJob()) {
UserDB.getUser(username)?.apply {
_userMutableLiveData.value = this
}
}
}
}
这里并行去本地和网络读取User的数据,读到之后可以用LiveData将数据发出去。如果没有并行任务的要求就比较简单了,挂起函数可以直接返回结构体:
class UserRepository{
suspend fun getUserHome(username: String): Home=
withContext(Dispatchers.Default) {
val getUserDeferred = async(SupervisorJob()) {
userService.getUser(username)
}
val response = getUserDeferred.await()
if (response.isSuccessful && response.body() != null) {
userService.getUserHome(response.body()!!).body()
} else {
null
}
}
}
上面代码用async和await函数实现串行任务。
LiveData使用协程
上面ViewModel层从Repository层拿到Model后,还是需要声明LiveData暴露给UI层才可以完成数据的完整传递。 比如这样:
class UserViewModel {
val userLiveData = userRepository.userLiveData
fun getUser(userName: String) {
viewModelScope.launch(Dispatchers.Default) {
userRepository.getUser(userName)
}
}
}
class UserActivity {
...
viewModel.userLiveData.observe(this) {
}
}
但LiveData提供了liveData
函数,可以用liveData{}
直接写协程代码块,在ViewModel中直接作为函数结果,拿到的数据通过emit发送出去。
class UserViewModel {
fun getUserHome(userName: String) = liveData {
viewModelScope.launch(Dispatchers.Default) {
emit(userRepository.getUserHome(userName))
}
}
}
这样有一个好处就是ViewModel层不用再特意声明LiveData变量给UI层了。UI层原先需要调用ViewModel的一个方法并且对暴露的LiveData变量做监听,用liveData{}
的话UI层只需要调用这一个方法就同时完成了请求与监听。
class UserActivity {
...
viewModel.getUserHome("abc").observe(this) {
}
}
小结
LifecycleOwner、ViewModel、数据层、LiveData处使用到协程的机会是比较多的,使用过程中再多注意一下协程的取消,不要内存泄漏;注意协程的异常处理;如果多个协程之间涉及到资源同步,就用synchronized或者Mutex解决。
Flow战前动员
在写这篇文章之前,我一直有一个疑问:单在异步线程处理上,协程已经足够优秀,配合LiveData处理数据的渲染,足以应付大多数的需求。那为什么又需要Flow呢?在什么地方使用Flow才是比较合适的呢?
针对第一个问题,为什么需要Flow?flow既能提供异步线程框架,又能处理数据,相当于是协程+liveData的结合体。并且flow可以随协程取消,并且处理更复杂的数据流,也可以设置数据重发量并解决背压,这是LiveData做不到的。就是在写法上可能会有坑,操作符相对LiveData比较复杂,处理起来也比较麻烦,比如collect
末端操作符要注意不能影响到主线程。
那既然Flow这么强大,是不是无脑写Flow,LiveData可以直接淘汰了呢?这个就见仁见智了,不同团队不同项目做出的选择肯定也是不一样的。在我理解下来,因为我们大部分的业务场景涉及到的数据流是比较简单的,而且不需要做什么复杂的线程切换,那我们就直接用LiveData,非常简单。如果数据流比较复杂,需要做线程切换,又或者要变换数据,就用Flow。如果在这基础上你还需要Flow重发数据,那就选择SharedFlow。如果你只需要重发最新的数据,也可以选择StateFlow,但需要注意StateFlow不会发送重复的数据。
所以针对这个原则,我们就可以解决第二个问题,在什么地方使用Flow比较合适,或者说比较容易上手呢?那就是Repository层。因为通常我们需要在Repository层获取网络数据或者获取本地、内存的数据,有时候不同数据源的数据是需要进行结合或者变换的,所以这里用到Flow的可能性是比较大的。Repository对数据进行处理后,ViewModel拿到的其实就是一个完整可用的数据结构了,ViewModel就可以简单地用LiveData完成与UI层的数据传递。
如果你一定要在UI层进行Flow的监听,那就需要在UI层起一个协程。这里需要注意的是,这里直接launch协程的话会不够安全,因为APP在后台仍会接收Flow数据的更新,容易引发崩溃。那么我们可以用launchedOnXXX
或者repeatOnLifecycle
来将flow与生命周期关联起来。
class UserActivity {
lifecycleScope.launchWhenResumed {
viewModel.userFlow.collect {
}
}
lifecycleScope.launch(Dispatchers.Default) {
repeatOnLifecycle(Lifecycle.State.RESUMED) {
viewModel.userFlow.collect {
}
}
}
}
launchedOnXXX
在不符合生命周期的情况下会暂停flow。repeatOnLifecycle
在每次触发时,会关掉之前的flow重新启动。
Flow实战
那么下面给大家看一下我是怎么写Repository->ViewModel->UI层的数据流转的。
class UserRepository {
/**
* Flow处理Repository层数据
*/
suspend fun getObservableUserHome(username: String): Flow<Home?> {
// 本地数据
return UserDB.getUserHome(username)
// 数据转换
.map { Home("a") }
// 网络数据
.flatMapConcat {
flow {
emit(userService.getUserHome(User("", "")).body())
}
}
}
/**
* Flow并行协程
*/
suspend fun getObservableUser(username: String): Flow<User?> {
return withContext(Dispatchers.Default) {
flowOf(
// 本地数据
UserDB.getObservableUser(username),
// 网络数据
userService.getObservableUser(username).map { it.body() }
).flattenConcat()
}
}
}
getObservableUserHome
方法获取了本地数据之后,再经过一次数据转换,再去获取网络数据。这是典型的串行任务,如果没有串行任务就更简单了,去掉flatMapConcat
就可以了。
getObservableUser
则是并行任务,获取本地数据和网络数据的两个flow是并发执行的。这里需要注意flattenConcat()
操作符只能先接收前一个flow的emit
,再接收后一个flow的emit
。
然后我们在ViewModel层调用Repository层的方法:
class UserViewModel {
suspend fun getObservableUser(userName: String): LiveData<User?> {
return userRepository.getObservableUser(userName).asLiveData()
}
suspend fun getObservableUserHome(userName: String): LiveData<Home?> {
return userRepository.getObservableUserHome(userName).asLiveData()
}
}
这里我们用asLiveData()
方法直接将flow转换成了LiveData,非常方便,看源码也可以看到里面是包了一层collect
@JvmOverloads
public fun <T> Flow<T>.asLiveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
collect {
emit(it)
}
}
如果有不一样的监听逻辑,比如collectLast
,也可以自己写。
然后我们就在UI层调用ViewModel方法就可以了:
class UserActivity {
lifecycleScope.launchWhenCreated {
viewModel.getObservableUser("").observe(this@CoroutineActivity) {
}
viewModel.getObservableUserHome("").observe(this@CoroutineActivity) {
}
}
}
小结
那Flow的实战就到这里,我主要把Flow用在了Repository层处理数据,ViewModel和UI层使用LiveData流转数据。当然,因为我自己也是边学习边总结,其中难免会有不对的地方,也希望大佬们多给点建议让我学习一番~