前言
Flow
是kotlin协程中的流。RxJava就是流式编程的库。Flow属于冷流对应RxJava中的Observable Flowable Single MayBe和Completable等。Kotlin协程中的热流实现MutableSharedFlow和MutableStateFlow等,对应RxJava中热流PublisherSubject和BehaviorSubject。
- 冷流:较少的访问和修改
- 热流:频繁地读取和更新
Flow使用
fun main() {
runBlocking (Dispatchers.Default){
// 发送10个元素,从0到9
val myFlow = flow {
repeat(10){
emit(it)
}
}
launch {
myFlow.collect{
println("Coroutine1:$it")
}
}
launch {
myFlow.collect{
println("Coroutine2:$it")
}
}
}
}
协程1和2通过Flow.collect
订阅Flow。
fun main() {
runBlocking (Dispatchers.Default){
// 发送10个元素,从0到9
val myFlow = flow {
repeat(10){
// 修改原来的CoroutineContext,会异常
withContext(Dispatchers.IO){
emit(it)
}
}
}
launch {
myFlow.collect{
println("Coroutine1:$it")
}
}
}
}
Flow限制,不能修改原来的CoroutineContext。可以使用ChannelFlow
就能正常使用。
fun main() {
runBlocking (Dispatchers.Default){
// 发送10个元素,从0到9
val myFlow = channelFlow {
repeat(10){
// 可以修改原来的CoroutineContext
withContext(Dispatchers.IO){
channel.send(it)
}
}
}
launch {
myFlow.collect{
println("Coroutine1:$it")
}
}
}
}
fun main() {
runBlocking(Dispatchers.Default) {
// 发送10个元素,从0到9
val myFlow = flow {
repeat(10) {
try {
emit(it)
} catch (e: Throwable) {
emit(22)
}
}
}
launch {
myFlow.collect {
if (it == 2) {
// 这里出现异常后,collect订阅就结束了(只打印2次,第三次就异常了)
error("Error")
}
println("Coroutine1:$it")
}
}
}
}
Flow中collect异常,那么订阅就结束了。
Flow工作原理
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
创建SafeFlow,继承于AbstractFlow,订阅调用的是collect。检查当前CoroutineContext和调用的collect方法传入的是否一致,不一致就抛出异常。 Flow被SafeCollector代理去检查异常。 转换前的流称上游Upstream
,处理后再发送到下游Downstream
flatMap操作符
类似于RxJava中的concatMap操作符
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 将原来的流元素构建成一个新的流(按照原来的流元素输出)
myFlow.flatMapConcat { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
输出:
collect 0
collect 1
collect 10
collect 11
collect 20
collect 21
将原来发送3个元素,通过flatMapConcat()
发送两个元素。越是先发送的元素延迟时间越长,然后按顺序输出6个元素。
flatMapMerge
类似于RxJava中的flatMap
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 将原来的流元素构建成一个新的流(默认并发16个)谁先执行完就发送谁
myFlow.flatMapMerge { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
输出:
collect 20
collect 21
collect 10
collect 11
collect 0
collect 1
不会保证原来的顺序,哪个流先处理完就先发送数据。concurrency
默认值16,并行执行的数量。当concurrency为1时和flatMapConcat一样。
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 将原来的流元素构建成一个新的流(并发数是2,达到2个的时候等待然后再执行下一个)
myFlow.flatMapMerge(2) { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
输出:
collect 10
collect 11
collect 0
collect 1
collect 20
collect 21
flatMapLatest
类似于RxJava中的switchMap
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 前面没执行完的Flow会被取消,然后被后续的Flow替换
myFlow.flatMapLatest { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
输出:
collect 20
collect 21
总结
介绍了Flow常用的操作符map flatMap(串式) flatMapMerge(并发) flatmapLatest(取代旧的)等简单使用。