前言
在学习RxJava 的过程中,每次想使用Rx的时候都不知道 应该如何下手,以此篇文章记录 备忘。
一、RxJava的重要组成
Rx 主要 Observable(可被观察者 —>被观察者) 和 Observer(观察者) ,
然后观察者和被观察者 通过 subscribe 产生关系,
通过 unSubscribe 解除产生关系。
Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。
二、Observable创建
1、使用 Observable.create() 创建:
create() 使用OnSubscribe从头创建一个Observable,这种方法比较简单。需要注意的是,使用该方法创建时,建议在OnSubscribe#call方法中检查订阅状态,以便及时停止发射数据或者运算。
// 1、通過 Observable.create 创建
Observable observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 3; i++) {
subscriber.onNext("hello Rx");
}
subscriber.onCompleted();
}
});
// 2、调用subscribe()方法对Observable订阅。订阅以后,我们开始接收Observable 发送的数据(即定义的String)。
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d("--onCompleted --","");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(String s) {
Log.d("--onNext --",s);
}
});
注意:
如果 要想保证 观察者成功 获得数据 其接受的类型 ,需要和发送的类型保持一致
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
2、使用 Observable.from() 创建:
将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable。Future转换为OnSubscribe是通过OnSubscribeToObservableFuture进行的,Iterable转换通过OnSubscribeFromIterable进行。数组通过OnSubscribeFromArray转换。
//2、通过 Observable.from
private void creatObservable2(){
List<String> stringList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
stringList.add(""+i);
}
Observable observable = Observable.from(stringList);
observable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("123","--->onCompleted");
}
@Override
public void onError(Throwable throwable) {
Log.d("123","--->onError--"+throwable.toString());
}
@Override
public void onNext(String s) {
Log.d("123","--->onNext--"+s);
}
});
}
from()创建符可以从一个列表/数组来创建Observable,并一个接一个的从列表/数组中发射出来每一个对象,或者也可以从JavaFuture类来创建Observable,并发射Future对象的.get()方法返回的结果值。传入Future作为参数时,我们可以指定一个超时的值。Observable将等待来自Future的结果;如果在超时之前仍然没有结果回,Observable将会触发onError()方法通知观察者有错误发生了。
3、使用 Observable.just()创建:
Observable.just()方法主要用于将Java对象转变为Observable对象发射出去。
//3、通过 Observable.from
private void creatObservable3(){
Observable observable = Observable.just(1,2,3,4);
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d("123","--->onCompleted--");
}
@Override
public void onError(Throwable throwable) {
Log.d("123","--->onError--"+throwable.toString());
}
@Override
public void onNext(Integer i) {
Log.d("123","--->onNext--"+i);
}
});
}
4、empty / error / never:
创建具有非常精确和有限行为的可观察性,这些用于测试目的,有时也用于将其他可观测值和其他可观测值作为参数的操作符组合。
empty :
创建一个什么都不做直接通知完成的Observable
/**
* empty :创建一个什么都不做直接通知完成的Observable
*/
private static void empty(){
Observable emptyObs = Observable.empty();//直接调用onCompleted。
emptyObs.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted:--->");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:--->");
}
@Override
public void onNext(String s) {
System.out.println("onNext:--->"+s);
}
});
}
error :
创建一个什么都不做直接通知错误的Observable
/**
* error : 创建一个什么都不做直接通知错误的Observable
*/
private static void error(){
Observable emptyObs = Observable.error(new NullPointerException());//直接调用onError。
emptyObs.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted:--->");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:--->"+e.toString());//NullPointerException
}
@Override
public void onNext(String s) {
System.out.println("onNext:--->"+s);
}
});
}
never:
创建一个什么都不做的Observable
/**
* never : 创建一个什么都不做的Observable
*/
private static void never(){
Observable emptyObs = Observable.never();
emptyObs.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted:--->");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:--->"+e.toString());//NullPointerException
}
@Override
public void onNext(String s) {
System.out.println("onNext:--->"+s);
}
});
}
5、Interval:
创建一个按照给定的时间间隔发射从0开始的整数序列的Observable,内部通过OnSubscribeTimerPeriodically工作。
/**
* interval : 使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器:
*/
private static void intervalTest(){
/**
* 参数1 :initialDelay 在发出第一个值之前,等待的初始延迟时间
* 参数2 :period 发送时间间隔
* 参数3 :unit 时间单位
* 参数4 :Scheduler 等待发生和项目的调度程序 ???
*/
Observable.interval(1,3,TimeUnit.SECONDS, Schedulers.trampoline())//等待 1秒后 ,每隔3秒执行一次
.subscribe(new Action1<Long>() { // 注意 是Long 类型
@Override
public void call(Long l) {
System.out.println("Action1:--->"+l);
}
});
// try {
// Thread.sleep(20_000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
注意:在使用 java程序测试 运行时,需要添加Schedulers.trampoline() 调度器,
Observable.interval()不起作用解决方法
6、timer:
创建一个在给定的延时之后发射数据项为0的Observable,内部通过OnSubscribeTimerOnce工作
/**
* Timer : 创建一个可观察的,在给定的延迟之后发出特定的项目
*/
private static void timerTest(){
Observable.timer(2,TimeUnit.SECONDS,Schedulers.trampoline()).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("timerTest--->");
}
});
}
7、range:
会发出一系列顺序整数,按顺序排列,在这里您选择范围的开始和长度
/**
* Range : 会发出一系列顺序整数,按顺序排列,在这里您选择范围的开始和长度。
*/
private static void rangeTest(){
Observable.range(2,9,Schedulers.trampoline())// 从2 开始 每次加1, 输出 9次,
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("rangeTest-->"+integer.toString());
}
});
}
8、repeat / repeatWhen:
repeat : 重复操作符会多次发出一个条目。这个操作符的一些实现允许您重复一系列的项目,一些允许您限制重复的次数。
repeatWhen :
/**
* repeat : 重复操作符会多次发出一个条目。这个操作符的一些实现允许您重复一系列的项目,一些允许您限制重复的次数。
*/
private static void repeatTest(){
Observable.range(2,3,Schedulers.trampoline())// 从2 开始 每次加1, 输出 9次,
.repeat(3)//重复3次 .repeat()接收到.onCompleted()事件后触发重订阅。
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted-->");
}
@Override
public void onError(Throwable e) {
System.out.println("onCompleted-->");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext-->"+integer);
}
});
Observable.range(2,3,Schedulers.trampoline())// 从2 开始 每次加1, 输出 9次,
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.delay(4,TimeUnit.SECONDS,Schedulers.trampoline());
}
},Schedulers.trampoline())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("repeatTest-->"+integer.toString());
}
});
}
参考文章:
RxJava-简介及Observable创建