当前位置: 首页>后端>正文

[一看就懂] 图解 Kotlin SharedFlow 缓存系统

前言

Kotlin 为我们提供了两种创建“热流”的工具:StateFlowSharedFlow。StateFlow 经常被用来替代 LiveData 充当架构组件使用,所以大家相对熟悉。其实 StateFlow 只是 SharedFlow 的一种特化形式,SharedFlow 的功能更强大、使用场景更多,这得益于其自带的缓存系统,本文用图解的方式,带大家更形象地理解 SharedFlow 的缓存系统。

创建 SharedFlow 需要使用到 MutableSharedFlow() 方法,我们通过方法的三个参数配置缓存:

fun <T> MutableSharedFlow(
    replay: Int = 0, 
    extraBufferCapacity: Int = 0, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

接下来,我们通过时序图的形式介绍这三个关键参数对缓存的影响。正文之前让我们先统一一下用语:

  • Emitter:Flow 数据的生产者,从上游发射数据
  • Subcriber:Flow 数据的消费者,在下游接收数据

replay

当 Subscriber 订阅 SharedFlow 时,有机会接收到之前已发送过的数据,replay 指定了可以收到 subscribe 之前数据的数量。replay 不能为负数,默认值为 0 表示 Subscriber 只能接收到 subscribe 之后 emit 的数据:

上图展示的是 replay = 0 的情况,Subscriber 无法收到 subscribe 之前 emit 的 ?,只能接收到 ? 和 ?。

当 replay = n ( n > 0)时,SharedFlow 会启用缓存,此时 BufferSize 为 n,意味着可以缓存发射过的最近 n 个数据,并发送给新增的 Subscriber。

上图以 n = 1 为例 :

  1. Emitter 发送 ? ,并被 Buffer 缓存
  2. Subscriber 订阅 SharedFlow 后,接收到缓存的 ?
  3. Emitter 相继发送 ? ? ,Buffer 缓存的数据相继依次被更新

在生产者消费者模型中,有时消费的速度赶不及生产,此时要加以控制,要么停止生产,要么丢弃数据。SharedFlow 也同样如此。有时 Subscriber 的处理速度较慢,Buffer 缓存的数据得不到及时处理,当 Buffer 为空时,emit 默认将会被挂起 ( onBufferOverflow = SUSPEND)

上面的图展示了 replay = 1 时 emit 发生 suspend 场景:

  1. Emitter 发送 ? 并被缓存
  2. Subscriber 订阅 SharedFlow ,接收 replay 的 ? 开始处理
  3. Emitter 发送 ? ,缓存数据更新为 ? ,由于 Subscriber 对 ? 的处理尚未结束,? 在缓存中没有及时被消费
  4. Emitter 发送 ?,由于缓存的 ? 尚未被 Subscriber 消费,emit 发生挂起
  5. Subscriber 开始消费 ? ,Buffer 缓存 ? , Emitter 可以继续 emit 新数据

注意 SharedFlow 作为一个多播可以有多个 Subscriber,所以上面例子中,? 被消费的时间点,取决于最后一个开始处理的 Subscriber。

extraBufferCapacity

extraBufferCapacity 中的 extra 表示 replay-cache 之外为 Buffer 还可以额外追加的缓存。

若 replay = n, extraBufferCapacity = m,则 BufferSize = m + n

extraBufferCapacity 默认为 0,设置 extraBufferCapacity 有助于提升 Emitter 的吞吐量

在上图的基础之上,我们再设置 extraBufferCapacity = 1,效果如下图:

上图中 BufferSize = 1 + 1 = 2 :

  1. Emitter 发送 ? 并得到 Subscriber1 的处理 ,? 作为 replay 的一个数据被缓存,
  2. Emitter 发送 ?,Buffer 中 replay-cache 的数据更新为 ?
  3. Emitter 发送 ?,Buffer 在存储了 replay 数据 ? 之上,作为 extra 又存储了 ?
  4. Emitter 发送 ?,此时 Buffer 已没有空余位置,emit 挂起
  5. Subscriber2 订阅 SharedFlow。虽然此时 Buffer 中存有 ? ? 两个数据,但是由于 replay = 1,所以 Subscriber2 只能收到最近的一个数据 ?
  6. Subscriber1 处理完 ? 后,依次处理 Buffer 中的下一个数据,开始消费 ?
  7. 对于 SharedFlow 来说,已经不存在没有消费 ? 的 Subscriber,? 移除缓存,? 的 emit 继续,并进入缓存,此时 Buffer 又有两个数据 ? ? ,
  8. Subscriber1 处理完 ? ,开始消费 ?
  9. 不存在没有消费 ? 的 Subscriber, ? 移除缓存。

onBufferOverflow

前面的例子中,当 Buffer 被填满时,emit 会被挂起,这都是建立在 onBufferOverflow 为 SUSPEND 的前提下的。onBufferOverflow 用来指定缓存移除时的策略,除了默认的 SUSPEND,还有两个数据丢弃策略:

  • DROP_LATEST:丢弃最新的数据
  • DROP_OLDEST:丢弃最老的数据

需要特别注意的是,当 BufferSize = 0 时,extraBufferCapacity 只支持 SUSPEND,其他丢弃策略是无效的。这很好理解,因为 Buffer 中没有数据,所以丢弃无从下手,所以启动丢弃策略的前提是 Buffer 至少有一个缓冲区,且数据被填满

上图展示 DROP_LATEST 的效果。假设 replay = 2,extra = 0

  1. Emitter 发送 ? 时,由于 ? 已经被消费,所以 Buffer 数据从 ?? 变为 ??
  2. Emitter 发送 ? 时,由于 ? 还未被消费,Buffer 处于填满状态, ? 直接被丢弃
  3. Emitter 发送 ? 时,由于 ? 已经被费,可以移除缓存,Buffer 数据变为 ??

上图展示了 DROP_OLDEST 的效果,与 DROP_LATEST 比较后非常明显,缓存中永远会储存最新的两个数据,但是较老的数据不管有没有被消费,都可能会从 Buffer 移除,所以 Subscriber 可以消费当前最新的数据,但是有可能漏掉中间的数据,比如图中漏掉了 ?

注意:当 extraBufferCapacity 设为 SUSPEND 可以保证 Subscriber 一个不漏的消费掉所有数据,但是会影响 Emitter 的速度;当设置为 DROP_XXX 时,可以保证 emit 调用后立即返回,但是 Subscriber 可能会漏掉部分数据。

如果我们不想让 emit 发生挂起,除了设置 DROP_XXX 之外,还有一个方法就是调用 tryEmit,这是一个非 suspend 版本的 emit

abstract suspend override fun emit(value: T)

abstract fun tryEmit(value: T): Boolean

tryEmit 返回一个 boolean 值,你可以这样判断返回值,当使用 emit 会挂起时,使用 tryEmit 会返回 false,其余情况都是 true。这意味着 tryEmit 返回 false 的前提是 extraBufferCapacity 必须设为 SUSPEND,且 Buffer 中空余位置为 0 。此时使用 tryEmit 的效果等同于 DROP_LATEST。

SharedFlow Buffer

前面介绍的 MutableSharedFlow 的三个参数,其本质都是围绕 SharedFlow 的 Buffer 进行工作的。那么这个 Buffer 具体结构是怎样的呢?

上面这个图是 SharedFlow 源码中关于 Buffer 的注释,这个图形象地告诉了我们 Buffer 是一个线性数据结构(就是一个普通的数组 Array<Any?>),但是这个图不能直观反应 Buffer 运行机制。下面通过一个例子,看一下 Buffer 在运行时的具体更新过程:

val sharedFlow = MutableSharedFlow<Int>(
    replay = 2, 
    extraBufferCapacity = 2,
    onBufferOverflow = BufferOverflow.SUSPEND
)
var emitValue = 1

fun main() {
    runBlocking {
        launch {
            sharedFlow.onEach {
                delay(200) // simulate the consume of data
            }.collect()
        }

        repeat(12) {
            sharedFlow.emit(emitValue)
            emitValue++
            delay(50)
        }
    }
}

上面的代码很简单,SharedFlow 的 BufferSize = 2+2 = 4,Emitter 生产的速度大于 Subscriber 消费的速度,所以过程中会出现 Buffer 的填充和更新,下面依旧用图的方式展示 Buffer 的变化

先看一下代码对应的时序图:

有前面的介绍,相信这个时序图很容易理解,这里就不再赘述了,下面重点图解一下 Buffer 的内存变化。SharedFlow 的 Buffer 本质上是一个基于 Array 实现的 queue,通过指针移动从往队列增删元素,避免了元素在实际数组中的移动。这里关键的指针有三个:

  • head:队列的 head 指向 Buffer 的第一个有效数据,这是时间上最早进入缓存的数据,在数据被所有的 Subscriber 消费之前不会移除缓存。因此 head 也代表了最慢的 Subscriber 的处理进度
  • replay:Buffer 为 replay-cache 预留空间的其实位置,当有新的 Subscriber 订阅发生时,从此位置开始处理数据。
  • end:新数据进入缓存时的位置,end 这也代表了最快的 Subscriber 的处理进度。

如果 bufferSize 表示当前 Buffer 中存储数据的个数,则我们可知三指针 index 符合如下关系:

  • replay <= head + bufferSize
  • end = head + bufferSize

了解了三指针的含义后,我们再来看上图中的 Buffer 是如何工作的:

最后,总结一下 Buffer 的特点:

  • 基于数组实现,当数组空间不够时进行 2n 的扩容
  • 元素进入数组后的位置保持不变,通过移动指针,决定数据的消费起点
  • 指针移动到数组尾部后,会重新指向头部,数组空间可循环使用

最后

如果想要成为架构师或想突破20~30K薪资范畴,那就不要局限在编码,业务,要会选型、扩展,提升编程思维。此外,良好的职业规划也很重要,学习的习惯很重要,但是最重要的还是要能持之以恒,任何不能坚持落实的计划都是空谈。

如果你没有方向,这里给大家分享一套由阿里高级架构师编写的《Android八大模块进阶笔记》,帮大家将杂乱、零散、碎片化的知识进行体系化的整理,让大家系统而高效地掌握Android开发的各个知识点。
[图片上传失败...(image-ab6ce-1698997964490)]
相对于我们平时看的碎片化内容,这份笔记的知识点更系统化,更容易理解和记忆,是严格按照知识体系编排的。

一、架构师筑基必备技能

1、深入理解Java泛型
2、注解深入浅出
3、并发编程
4、数据传输与序列化
5、Java虚拟机原理
6、高效IO

二、Android百大框架源码解析

1.Retrofit 2.0源码解析
2.Okhttp3源码解析
3.ButterKnife源码解析
4.MPAndroidChart 源码解析
5.Glide源码解析
6.Leakcanary 源码解析
7.Universal-lmage-Loader源码解析
8.EventBus 3.0源码解析
9.zxing源码分析
10.Picasso源码解析
11.LottieAndroid使用详解及源码解析
12.Fresco 源码分析——图片加载流程

三、Android性能优化实战解析

  • 腾讯Bugly:对字符串匹配算法的一点理解
  • 爱奇艺:安卓APP崩溃捕获方案——xCrash
  • 字节跳动:深入理解Gradle框架之一:Plugin, Extension, buildSrc
  • 百度APP技术:Android H5首屏优化实践
  • 支付宝客户端架构解析:Android 客户端启动速度优化之「垃圾回收」
  • 携程:从智行 Android 项目看组件化架构实践
  • 网易新闻构建优化:如何让你的构建速度“势如闪电”?

四、高级kotlin强化实战

1、Kotlin入门教程
2、Kotlin 实战避坑指南
3、项目实战《Kotlin Jetpack 实战》

  • 从一个膜拜大神的 Demo 开始

  • Kotlin 写 Gradle 脚本是一种什么体验?

  • Kotlin 编程的三重境界

  • Kotlin 高阶函数

  • Kotlin 泛型

  • Kotlin 扩展

  • Kotlin 委托

  • 协程“不为人知”的调试技巧

  • 图解协程:suspend

五、Android高级UI开源框架进阶解密

1.SmartRefreshLayout的使用
2.Android之PullToRefresh控件源码解析
3.Android-PullToRefresh下拉刷新库基本用法
4.LoadSir-高效易用的加载反馈页管理框架
5.Android通用LoadingView加载框架详解
6.MPAndroidChart实现LineChart(折线图)
7.hellocharts-android使用指南
8.SmartTable使用指南
9.开源项目android-uitableview介绍
10.ExcelPanel 使用指南
11.Android开源项目SlidingMenu深切解析
12.MaterialDrawer使用指南

六、NDK模块开发

1、NDK 模块开发
2、JNI 模块
3、Native 开发工具
4、Linux 编程
5、底层图片处理
6、音视频开发
7、机器学习

七、Flutter技术进阶

1、Flutter跨平台开发概述
2、Windows中Flutter开发环境搭建
3、编写你的第一个Flutter APP
4、Flutter开发环境搭建和调试
5、Dart语法篇之基础语法(一)
6、Dart语法篇之集合的使用与源码解析(二)
7、Dart语法篇之集合操作符函数与源码分析(三)

八、微信小程序开发

1、小程序概述及入门
2、小程序UI开发
3、API操作
4、购物商场项目实战……


https://www.xamrdz.com/backend/3bb1940100.html

相关文章: