一、操作符
1、转换操作符
map
可将A类型的数据转换成B类型的数据
fun testMap() = runBlocking<Unit> {
(1..3).asFlow(
).map {
"to string $it"
}.collect {
println(it)
}
}
transform
与map类似,transform可以实现更为复杂的变换
fun testTransform() = runBlocking<Unit> {
(1..3).asFlow()
.transform<Int,String> {
emit("response $it")
emit("response $it next")
}
.collect{
println(it)
}
}
take 限长操作符
fun testTake() = runBlocking<Unit> {
flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} catch (e: Exception) {
println("finally in numbers")
}
}.take(2)
.collect {
println("collect num $it")
}
}
//print result
//collect num 1
//collect num 2
// finally in numbers
2、末端操作符
末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是有另外一些更方便使用的末端操作符。
- 转化为各种集合,例如toList和toSet。
- 获取第一个(first)值与确保流发射单个(single)值操作符。
- 使用reduce与flod将流规约到单个值
fun testReduce() = runBlocking<Unit> {
(1..50).asFlow().map {
it*it
}.reduce{a,b->
a+b
}
}
//print result
//55
3、zip组合
fun testZip() = runBlocking<Unit> {
val nums = (1..3).asFlow().onEach { delay(300) }
val strs = listOf<String>("One", "Two", "Three").asFlow().onEach {
delay(400)
}
nums.zip(strs) { a, b ->
"$a->$b"
}.collect {
println(it)
}
}
//print result
//1->One
//2->Two
//3->Three
//total duration 1496
4、展平流
流表示异步接收的值序列,所以容易遇到这样的情况:每个值都会触发另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,存在一系列的展平操作符:
- flatMapContact 连接模式
- flatMapMerge 合并模式
- flatMapLatest 最新展平模式
fun testFlatMapConcat() = runBlocking<Unit> {
val duration = measureTimeMillis {
(1..3)
.asFlow().flatMapConcat {
flow {
emit("$it first")
delay(500)
emit("$it second")
}
}.collect {
println(it)
}
}
println("total duration $duration")
}
//print result
//1 first
//1 second
//2 first
//2 second
//3 first
//3 second
//total duration 1596
fun testFlatMapMerge() = runBlocking<Unit> {
val duration = measureTimeMillis {
(1..3)
.asFlow().flatMapMerge {
flow {
emit("$it first")
delay(500)
emit("$it second")
}
}.collect {
println(it)
}
}
println("total duration $duration")
}
//print result
//1 first
//2 first
//3 first
//1 second
//2 second
//3 second
//total duration 765
5、流的异常处理
当运算符中的发射器或者代码抛出异常,有几种处理方式。
- try/catch块(处理收集时候异常)
- catch 函数(用于捕获上游异常)
流抛出异常的地方分别在发射(emit)和收集(collect)时候。
fun testHandleException() = runBlocking<Unit> {
flow<Int> {
emit(1)
throw ArithmeticException("Num error")
}.catch { e: Throwable ->
println("catch $e")
emit(-1)
}.flowOn(Dispatchers.IO)
.collect {
println("$it")
}
}
//print result
//catch java.lang.ArithmeticException: Num error
//1
//-1
6、onCompletion 流的完成
onCompletion操作符在flow中类似finally的作用,但是可以获取到异常
fun testOnCompletion() = runBlocking<Unit> {
flow<Int> {
emit(1)
throw ArithmeticException("Num error")
}
.onCompletion {exception->
//可以获取到异常信息,但是不捕获异常
println("onCompletion $exception")
}
.catch { e:Throwable->
println("catch $e")
}
.flowOn(Dispatchers.IO)
.collect {
println("$it")
}
}