当前位置: 首页>后端>正文

Kotlin SharedFlow 使用

前言

与Flow(冷流)不同,SharedFlow是热流。它可以在多个消费者之间共享数据,并且可以在任何时候发射新值。这使得它非常适合用于多个消费者需要访问相同数据的情况。不过本文并不打算深入讲解SharedFlow原理,而是从结合demo从使用上来带大家熟悉其特性。

SharedFlow 使用

最基础的生产消费模型

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>()
        launch {
//消费者接收数据
            sharedFlow.collect {
                println("collect: $it")
            }
        }
        delay(100) //确保已经订阅
      //生产者发射数据
        sharedFlow.emit(1)
    }

输出:collect: 1

这种最简单的模式下,是看到了预期的打印。这种应该是大家都能理解的生产者-消费者模型。

消费者没有在单独的协程

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>()

        sharedFlow.collect {
            println("collect: $it")
        }

        println("wait emit")

        delay(100) //确保已经订阅
        sharedFlow.emit(1)
    }

猜一下结果?
没有打印输出

注意:这里区别是collect没有在单独的协程调用。因为collect是个挂起函数,会让当前协程挂起。由于生产者还没生产数据,消费者调用collect时发现没数据后便挂起协程。所以生产者和消费者要处在不同的协程里。

生产者先发射,消费者再接收

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>()
        sharedFlow.emit(1)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

结果:collect没有收到数据。
原因:先发射了数据,此时消费者还没有订阅,导致数据丢失。这也就说明了SharedFlow默认是没有粘性的。

关于”粘性“:对于新的订阅者重放其已发出的值。这意味着当一个新的订阅者被添加到一个流中时,它将接收到流先前发出的所有值,即使它们在订阅之前已经被发出。

我们大胆猜想下,要让SharedFlow具备粘性,就应该让其具有缓存机制。

历史数据的重放机制
用过livedata的人都只知道,即使先更新了数据,但每次添加了新的观察者,都能收到最新的数据,但SharedFlow默认是不具备这种能力的。但是这并不代表SharedFlow不行,而是需要一定的配置才能实现这种能力,其实SharedFlow这方面能力比livedata更强大,LiveData只能收到一个最新的值,但是SharedFlow经过配置之后是可以收到多个发射的历史数据。

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {

先来看下MutableSharedFlow的构造方法中的参数

  • replay:重放次数。可以给订阅者发送之前已经发射的数据,而发射数据的个数就是通过replay指定的;
  • extraBufferCapacity:是指Buffer中除了replay外,额外增加的缓存数量;
  • onBufferOverflow:缓存区满了之后的溢出策略,有3种策略可供选择。默认BufferOverflow.SUSPEND,缓存溢出时挂起;另外还有2种丢弃策略,DROP_OLDESTDROP_LATEST,分别是溢出时丢弃缓冲区中最旧的值和最新的值。

只配置replay

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>(replay = 1)
        sharedFlow.emit(1)

        launch {
            sharedFlow.collect {
                println("collect: $it")
            }
        }
    }

结果:collect是收到了数据

这是replay缓存区缓存数量为1,所以后面添加的收集者可以收到历史数据。当然这个数量,你可以任意指定。

设置extraBufferCapacity

  runBlocking {
       val sharedFlow = MutableSharedFlow<Int>(
            replay = 2,
            extraBufferCapacity = 1
        )

        sharedFlow.emit(1)
        sharedFlow.emit(2)
        sharedFlow.emit(3)


        launch {
            sharedFlow.collect {
                println("collect: $it")
                delay(1000)
            }
        }

    }

先猜下这段代码的结果是?

结果:collect: 2 collect: 3

可能很多人会很奇怪,前面说过缓存数量bufferSize是replay + extraBufferCapacity。那这段代码中bufferSize是3啊,为什么collect只有2条数据呢?我敢说,这个问题很多用ShareFlow的人都没有搞清楚(至少我在网上看到的博客是这样的)。

我先不解释,我们再来看一个例子

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>(
            replay = 2,
            extraBufferCapacity = 1
        )
        sharedFlow.emit(1)
        sharedFlow.emit(2)
        launch {
            sharedFlow.collect {
                println("collect: $it")
                delay(1000) //模拟处理背压
            }
        }
        delay(200)
        sharedFlow.emit(3)
        sharedFlow.emit(4)
    }

结果:collect能收到4个数据

在这段代码中,replay和extraBufferCapacity没有变化。区别是先发射了2条数据1,2;然后开始订阅,等订阅了之后再发射数据3,4。而前一段代码是先发射完所有数据在开始订阅。

解析:extraBufferCapacity 是用于控制额外缓冲区的容量。额外缓冲区是一个用于存储新值的缓冲区,当重放缓冲区已满时,新值将被存储在额外缓冲区中,直到有收集器准备好收集它为止。总结下,extraBufferCapacity对应的额外缓冲区是要在有收集者订阅之后才能起作用,否则只有replay重放缓存区起作用。

emit也是个挂起函数

 runBlocking {
        val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 0)

        launch {
            sharedFlow.collect {
                println("collect: $it")
                delay(1000)
            }
        }

        launch {
            sharedFlow.collect {
                println("collect2: $it")
                delay(2000)
            }
        }

        delay(200)
        for (value in 1 until 4){
            println("emit value: $value")
            sharedFlow.emit(value)
        }

    }

输出:
emit value: 1
collect: 1
collect2: 1
emit value: 2
collect: 2
collect2: 2
emit value: 3
collect: 3
collect2: 3

从打印可以看出:生产者要等待消费者消费完数据才进行下一次emit

通过replay或者extraBufferCapacity解决背压问题
SharedFlow 是一种具有背压支持的流。背压是一种流量控制机制,用于控制数据流的速率,以确保接收端能够处理数据的速度不超过其处理能力。在 SharedFlow 中,背压机制通过以下方式实现:

当缓冲区已满时, emit 函数将会被挂起,直到缓冲区中有足够的空间来接收新的值。这样可以避免生产者发送大量的数据,而消费者无法及时处理的情况,从而导致内存溢出或应用程序崩溃。

当消费者收到新值时,如果其处理速度较慢,那么生产者将会被挂起,直到消费者处理完所有的值,并释放了足够的空间来接收新的值。这样可以避免消费者被压垮,从而导致应用程序变得不可用。

因此, SharedFlow 的背压机制可以确保生产者和消费者之间的数据流量得到平衡,以避免出现数据丢失或内存泄漏等问题。这使得 SharedFlow 成为处理大量数据的可靠和高效的方案之一。

但是这样有个问题,生产者速度可能被消费者拖累。先来看一段代码:

runBlocking {
//        val sharedFlow = MutableSharedFlow<Int>(replay = 3, extraBufferCapacity = 0)
        val sharedFlow = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 3)

        launch {
            sharedFlow.collect {
                println("collect: $it")
                delay(1000)
            }
        }

        launch {
            sharedFlow.collect {
                println("collect2: $it")
                delay(2000)
            }
        }

        delay(200)
        for (value in 1 until 4){
            println("emit value: $value")
            sharedFlow.emit(value)
        }

输出:emit value: 1
emit value: 2
emit value: 3
collect: 1
collect2: 1
collect: 2
collect2: 2
collect: 3
collect2: 3

这段代码中消费者的速度明显比生产者慢很多,但我们通过配置replay或者extraBufferCapacity来设置了缓存buffer,就可以避免消费者拖累生产者速度的问题。


https://www.xamrdz.com/backend/3qd1941450.html

相关文章: