当前位置: 首页>移动开发>正文

Java reactor flux java reactor flux类型


文章目录

  • java异步01——初识reactor
  • reactor介绍
  • Flux
  • Mono
  • 简单看一下Flux和Mono的api
  • 订阅
  • generate
  • create
  • handle
  • Schedulers
  • 线程模型
  • 异常处理
  • 常规处理
  • transform操作符


java异步01——初识reactor

reactor介绍

Reactor是一个面向JVM(这意味着支持kotlin或者其他语言)的非阻塞响应式编程,具有高效的demand管理(以管理“背压”的形式)。它直接与Java 8功能(lambda、stream)集成在一起,特别是CompletableFuture,Stream和Duration。

Flux

Flux 是标准的Publisher,它表示0到N个发射项目的异步序列。会发出:OnNext,OnError,onComplete三种信号。

Mono

只发送一项的Pubilisher,有一些运算符可以返回Flux。

使用一个空的Mono <Void>表示无值异步过程。

简单看一下Flux和Mono的api

flux

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");//直接赋值创建flux
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);//利用迭代器创建一个flux
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); //从5到7的flux,第二个参数表示产生的个数

mono

Mono<String> noData = Mono.empty(); //创建一个空mono
Mono<String> data = Mono.just("foo");//创建一个mono

订阅

只有订阅发生了,流才会开始

订阅接口有:

subscribe(); 

subscribe(Consumer<? super T> consumer); 

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

也可以通过继承抽象类BaseSubscriber的实例来入参subscribe

public class ReactorTest {
	public static void main(String[] args){
		SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
		Flux<Integer> ints = Flux.range(1, 4);
		ints.subscribe(i -> System.out.println(i),
		    error -> System.err.println("Error " + error),
		    () -> {System.out.println("Done");},
		    s -> s.request(10));
		ints.subscribe(ss);
	}
}
class SampleSubscriber<T> extends BaseSubscriber<T> {
	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);//请求一个资源
	}
	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);//请求一个资源
	}
}

输出:

1
2
3
4
Done
Subscribed
1
2
3
4

通过输入Subscriber的方式来订阅事件的话,需要使用request来向数据源请求事件

generate

Flux.generate(Callable<S>, BiFunction<S, SynchronousSink<T>, S>)

其中Callable<S>是个回调函数,可以产生一个state,S就是State的类型

BiFunction<S, SynchronousSink<T>, S>是迭代递归函数,输入有两个输入:SSynchronousSink<T>S是state,SynchronousSink<T>是sink,最后一个是S,是返回值类型,也是一个state。

SynchronousSink<T>的API是:

public interface SynchronousSink<T> {
	void complete();//给出完成信号
	Context currentContext();//获取上下文
	void error(Throwable e);//抛出异常
	void next(T t);//定义flux中下一个元素
}

用来构造一个Flux,逐个产生元素

Flux<String> flux = Flux.generate(
	    () -> 0, 
	    (state, sink) -> {
	      sink.next("3 x " + state + " = " + 3*state); 
	      return state + 1; 
	    });

首先() -> 0产生一个state,然后这个state和sink作为每次迭代的入参,每次迭代的过程通过sink.next来指定下一个值,通过sink.complete标志迭代结束,然后返回一个更新的state,以便下次迭代根据state来生成新的值。值得说明的是,在一个generate中,sink.next不能调用两次。

create

这个方法用起来和generate差不多(看原理上是有不小差距,但是我从api上没有体会到),并且sink.next可以调用多次

例子:

public class ReactorTest {
	public static void main(String[] args){
		Flux<String> flux = Flux.create(sink->{
			for(int i = 0; i < 10; i++){
				sink.next("3 * " + i + " = " + (3*i));
			}
		});
		CoreSubscriber<String> coreSubscriber = new SampleSubscriber();
		flux.subscribe(coreSubscriber);
	}
}
class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

handle

接在Flux已有的数据源上使用,可以看成是滤波

public class ReactorTest {
	public static void main(String[] args){
		Flux<String> flux = Flux.create(sink->{
			for(int i = 0; i < 10; i++){
				sink.next("3 * " + i + " = " + (3*i));
			}
		}).handle((i,sink)->{//进行过滤
			sink.next(i+" hello");
		});
		CoreSubscriber<String> coreSubscriber = new SampleSubscriber();
		flux.subscribe(coreSubscriber);
	}
}
class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

Schedulers

reactor中,可以通过选择schedulers来选择并发模型

  • 当前线程(Schedulers.immediate())
  • 可重用单线程(Schedulers.single())
  • 专一单线程(Schedulers.newSingle())
  • 弹性线程池(Schedulers.elastic())
  • 固定大小线程池(Schedulers.parallel())创建和cpu个数相同的线程

线程模型

  • publishOn强制下一个操作符运行在一个不同的线程上
  • subscribeOn强制上一个操作符运行在一个不同的线程上

异常处理

在响应式流中,错误是终止事件,他会沿着操作链向下传递,直到遇到OnError方法

常规处理

输入异常处理函数,例如:

public class ReactorTest {
	public static void main(String[] args){
		Flux<String> flux = Flux.create(sink->{
			for(int i = 0; i < 10; i++){
				sink.next("3 * " + i + " = " + (3*i));
			}
			sink.error(new IllegalAccessError());
		}).handle((i,sink)->{
			sink.next(i+" hello");
		});
		flux.subscribe(System.out::println, (t)->System.err.println("catch a error"));
	}
}

transform操作符

拼接处理操作符



https://www.xamrdz.com/mobile/4ps1957425.html

相关文章: