RxJava是一套响应式编程技术,实现业务和逻辑上的链式编程。RxJava和RxJava2的使用会有一些区别,这篇博客基于RxJava2。我将总结一下RxJava2的使用,整理一下RxJava2常用的操作符,针对每个操作符,都会有相应的代码,帮助我们理解。
一.RxJava概论
RxJava也是基于观察者模式:被观察者完成计算任务,观察者根据被观察者的计算结果作出响应,两者通过subscribe绑定在一起,实现链式的数据流动。RxJava有两个大版本,RxJava1.x和RxJava2.x,两个版本的RxJava在使用上有很多不同的地方,但基本的设计思想是一致的。
在Android中,对于网络请求,文件或者数据库的读写等耗时操作,我们会放在后台线程去操作,执行完成后通知UI更新,Android自带的Handler,AsyncTask等均是如此实现的。因此,执行任务和UI更新需要切换线程。在RxJava中,线程切换通过两个关键函数subscribeOn和observeOn来实现。
1.subscribeOn
subscribeOn用来规定任务执行的线程,RxJava提供了一系列线程类型来满足不同的使用场景。
(1)Schedulers.newThread()
新创建一个普通的线程,执行任务。
(2)Schedulers.io()
I/O类型的线程,例如读写文件和数据库,访问网络请求等,均可以使用该线程。与newThread不同的是,io内部维护着线程池,资源分配和利用上更好。这也是最常用的线程类型。
(3)Schedulers.computation()
计算密集型线程,例如游戏图像的高频率渲染,复杂的数学计算等。
2.observeOn
obserbeOn用来配置观察者所在的线程类型。Android中常见的场景:非UI线程执行耗时操作,UI线程接收后台的计算结果来更新UI。RxJava配置观察者所在线程一般如下:
.observeOn(AndroidSchedulers.mainThread())
二.常用操作符
1.just操作符
just操作符用来连续发射多个对象组成的序列数据流,最多为10个。just操作符的基本使用如下:
private void testJust() {
LoginBean bean1 = new LoginBean();
bean1.setResult("true");
bean1.setUsername("a");
bean1.setPassword("111111");
Observable.just("用户信息", 2, bean1, true, 5, 6, 7, 8, 9, 10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Serializable>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Serializable serializable) {
if (serializable instanceof LoginBean) {
Log.d("TTTT", ((LoginBean) serializable).getUsername() + "");
} else {
Log.d("TTTT", serializable.toString());
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下最终的输出结果,可以看到,是按照我们输入的顺序连续发射的:
2.interval和intervalRange操作符
interval和intervalRange两个操作符用来实现周期性的发射数据流。interval操作符是无范围连续周期性的发送数据,不会停止。而intervalRange操作符会在到了某个范围后,停止发射数据。
(1)interval操作符
如上所述,interval操作符周期性的发送数据,不会因为到了某个范围而停止。interval的定义是interval(long initialDelay, long period, TimeUnit unit)。第一个参数为延迟时间,第二个参数是时间周期,第三个参数是时间单位。interval操作符的使用如下:
private void testInterval() {
Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("TTTT", aLong + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下输出的结果,可以发现,会一直周期性的不断的发送数据,不会停止:
(2)intervalRange操作符
intervalRange操作符周期性的发射数据流,达到预先定下的范围会停止。intervalRange操作符的定义intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)。第一个参数是开始的值,第二个参数是连续发送的值的数量(也就是范围,到了这个数量将会停止),第三个参数是延迟的时间,第四个参数是时间周期,最后一个参数是时间单位。intervarRange的使用如下:
private void testIntervalRange() {
Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("TTTT", aLong + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下最终的输出结果,当发送完10个数据后,自动停止:
三.map和flatMap操作符
上面介绍的操作符,输入的数据是什么,输出的数据也是一样的。接下来,将介绍一下数据流的转换,也就是把发射出的数据转换为另一种形式输出。在RxJava中,map和flatMap就是实现数据的转换。
1.map操作符
map操作符的使用,需要通过一个转换方法Function来实现数据的转换。例如,我们实现一个简单的需求,把输入的小写的字符串转换为大写并输出,使用map操作符实现如下:
private void testMap() {
mFunction = new io.reactivex.functions.Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s.toUpperCase();
}
};
Observable.just("asdfghjkl")
.map(mFunction)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("TTTT", s + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下输出的结果,是不是小写转换为大写输出:
2.flatMap操作符
map是把一个数据对象经过加工,变为另一个数据对象输出。而flatMap操作符是把一个集合元素对象拆分成单个元素对象作为输入,也是依赖于Function,flatMap的使用如下:
private void testFlatMap() {
mFunction2 = new io.reactivex.functions.Function<List<String>, Observable<String>>() {
@Override
public Observable<String> apply(List<String> strings) throws Exception {
String[] s = new String[strings.size()];
for (int i = 0; i < strings.size(); i++) {
s[i] = strings.get(i);
}
return Observable.fromArray(s);
}
};
List<String> mList = new ArrayList<>();
mList.add("aaa");
mList.add("bbb");
mList.add("ccc");
Observable.just(mList)
.flatMap(mFunction2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("TTTT", s + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下最后的输出结果,是不是拆分开输出的:
四.zip,merge和concat操作符
这三个操作符,可以把若干个被观察者整合为一个被观察者,他们三个存在细微的差别,接下来,将总结一下这三个操作符的使用。
1.zip操作符
zip操作符的目的是合并两个被观察者,并最终发射一个被观察者作为结果输出。zip的实现是通过BiFunction类,通过其apply函数,对两个被观察者的数据进行合并,zip操作符的使用如下:
private void testZip() {
observable1 = Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "aaa";
}
});
observable2 = Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "bbb";
}
});
mBiFunction = new BiFunction<String, String, String>() {
@Override
public String apply(String s1, String s2) throws Exception {
return s1 + ";" + s2;
}
};
Observable.zip(observable1, observable2, mBiFunction)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("TTTT", s + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下,最后的输出结果是不是aaa;bbb:
2.merge操作符和concat操作符
merge操作符和zip操作符的作用类似,merge是合并若干个被观察者,然后线性的输出结果。concat操作符的使用和merge一样,两者的区别在于:merge不保证输出的顺序,而concat保证结果按照合并的顺序输出。merge操作符的使用如下:
private void testMerge() {
observable1 = Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "a";
}
});
observable2 = Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "b";
}
});
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("TTTT", s + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下输出的结果:
五.scan和filter操作符
1.scan操作符
scan操作符将被观察者的结果在BiFuction类中扫描一遍后,筛选出符合条件的结果交给观察者使用,scan操作符的实现关键也在于BiFunction类。例如,我们想要实现依次输入0-4五个数字,然后输出最大的数字,我们scan操作符的实现如下:
private void testScan() {
mBiFunction2 = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer i, Integer i2) throws Exception {
return Math.max(i, i2);
}
};
Observable.range(0, 5)
.scan(mBiFunction2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d("TTTT", integer + "");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下,最后的输出结果是不是正确的。一开始输入0,0就是最大的,后来输入1,0和1比较,1最大,依次进行:
2.filter操作符
filter操作符实现按照一定的条件过滤和筛选数据集,输出最终的结果。filter实现的关键是依赖Predicate的test方法,如果test返回true,说明通过筛选,否则没通过筛选。例如,我们输入0-4五个数字,最后只输出偶数,那么,filter操作符的实现如下:
private void testFilter() {
Observable.range(0, 5)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
if (integer % 2 == 0) {
return true;
} else {
return false;
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d("TTTT",integer+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
我们看一下运行结果,是不是筛选出了0,2,4三个数作为输出结果:
以上是对RxJava2常用的一些操作符的总结,还有一些其他的操作符我没总结到,今天先到此为止,在后面的时间,我会继续补充和完善。