当前位置: 首页>后端>正文

Kotlin Flow

协程中,与仅返回单个值的挂起函数不同,flow可按顺序发出多个值。例如,可以使用flow接收来自数据库的实时更新。

flow在协程的构建基础上,可以提供多值返回。从概念上来说,flow可以通过异步方式处理一组数据序列。前提是所发出的值的类型必须相同。例如,Flow<Int>是返回整数值数据流。

flow与生成一组序列值的Iterator非常相似,但它使用挂起函数通过异步方式生成和消费这个值。

flow包括三个实体:

  • Producer: 会生成添加到数据流中的数据。得益于协程,flow还可以异步产生数据。
  • (Optional)Intermediary: 可以修改发送到flow中的值,或修正flow本身
  • Consumer: 使用flow中的值。
Kotlin Flow,第1张
图 1. 数据流中包含的实体:使用方、可选中介和提供方。

在Android中,仓库(repository)通常是UI数据的提供方,UI是其数据的最终使用方。而其他时候,UI层是用户输入事件的提供方,其他层则是这些事件的使用方。提供方和使用方之间的层通常被称作中介,负责修改数据流,以满足其后层的要求。

创建Flow

如需创建flow,可以使用flow 构造器API。flow构造函数会创建一个新的flow,可以使用emit函数手动将新值发送到flow中。

如以下示例,数据源以固定时间间隔自动获取最新资讯。由于挂起函数不能返回多个连续值,数据源将创建返回flow来满足要求。

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow 构造器在协程内执行。因此,它将受益于相同异步API,但也存在一些限制:

  • flow是有序的。当协程内的Producer调用挂起函数时,Producer会挂起,直到挂起函数返回。在此示例中,Producer会挂起,直到fetchLatestNews网络请求完成为止。只有这样请求结果才会发送到flow中。
  • 使用flow构造器时,Producer不能提供来自不同CoroutineContextemit值。因此,请勿通过创建新协程或使用withContext代码块,在不同CoroutineContext中调用emit。在这些情况下,可以使用其他flow构造器,例如callbackFlow。

修改flow

Intermediary可以利用中间运算符在不消费值的情况下修改数据流。这些运算符都是函数。可在应用于数据库时,设置一系列暂不执行的链式运算,留待将来使用值时执行。如需详细了解中间运算符,请参阅Flow参考文档。

在以下示例中,存储层使用中间运算符map来转换将在View上显示的数据:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

中间运算符可以连接多个,形成链式运算,在数据项被发送到数据流时延迟执行。请注意,仅将一个中间运行符应用于数据流不会启动flow。

从Flow中进行收集

使用终端运算符可触发flow开始监听流的值。如需获取流中所有发出来的值,可以使用collect。如需详细了解终端运算符,请参阅官方Flow文档。

由于collect是挂起函数,因此需要在协程中执行。它接受lambda作为在每个新值上调用的参数。由于它是挂起函数,调用collect的协程可能会挂起,直到该flow关闭。

继续之前的示例,下面将展示一个简单的ViewModel实现,展示其如何使用存储库层中的数据:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

收集数据流会触发提供方刷新最新资讯,并以固定时间间隔发出网络请求。由于提供主始终通过while(true)循环保持活跃状态,因此,在清除ViewModel并取消viewModelScope数据流后,数据流将关闭。

Flow收集可能会由于以下原因而停止:

  • 如上例所示,协程收集取消。些操作也会底层Producer停止活动。
  • Producer完成了发送数据操作。在这种情况下,数据流将关闭,调用collect的协程继续执行。

除非使用其他中间运算符指定流,否则Flow始终为冷式和延迟执行。这意味着,每次在flow上调用终端运算符时,都会执行Producer方的代码。在前面示例中,拥有多个flow收集器会导致数据源以不同的固定时间间隔多次获取最新资讯。如需在多个使用方同时收集优化并共享数据流,请使用shareIn运算符。

捕获异常

Producer的数据实现可来自第三方库。这意味着可能会引发异常。如需处理这些异常,请使用catch中间运算符。

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

在之前的示例中,发生异常时,系统不会调用collect的lambda参数,因为未收到新数据项。catch还可执行emit操作,向flow发出数据。示例如下:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

在不同的CoroutineContext中执行

默认情况下,flow构造器的producer会通过从协程的CoroutineContext上执行,并且无法从不同的CoroutineContext对值执行emit操作。在某些情况下,可以跳出这个限制。如上示例代码中,存储层不应在viewModelScope所使用的Dispatchers.Main上执行。如需更改flowCoroutineContext,请使用中间运算符flowOnflowOn会更改上流数据流的CoroutineContext,这表示会在flowOn之前(或之上)producer以及任何中间运行符都会在传入的这个CoroutineContext上执行。下游数据不会受到影响。如果有多个flowOn运算符,每个运算符都会更改当前位置的上流数据流。

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Jetpack库中的Flow

许多的Jetpack库已集成了flow,并且在Android第三方库中也非常受欢迎。flow非常适合实时数据更新和无限数据流。

比如使用Flow with Room接收有关数据库更改的通知。在使用Room DAO时,返回flow类型以获取实时更新。

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

每当Example数据表发生更改时,系统都会发出数据库更新的列表。

将基于回调的API转换为数据流

callbackFlow是一个flow构造器,允许将基于回调的API转换为数据流。如:Firebase Firestore Android API 会使用回调。

?注意:从 24.3.0 版开始,firestore-ktx 包含返回 Flowsnapshots() 扩展,因此您无需自行针对此特定 API 执行这一转换。

如需将这些 API 转换为数据流并监听 Firestore 数据库的更新,可使用以下代码:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference= null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow构建器,callbackFlow允许从不同的CoroutineContext替换为send函数或协程之外的trySend函数。

在协程内部,callbackFlow 会使用通道,它在概念上与阻塞队列非常相似。通道都有容量配置,限定了可缓冲元素数的上限。在 callbackFlow 中所创建通道的默认容量为 64 个元素。当您尝试向完整通道添加新元素时,send 会将数据提供方挂起,直到新元素有空间为止,而 offer 不会将相关元素添加到通道中,并会立即返回 false


https://www.xamrdz.com/backend/3c41995572.html

相关文章: