当前位置: 首页>移动开发>正文

rxjava 有新消息时取消旧的观察 rxjava subscribeon observeon

前言

在学习RxJava 的过程中,每次想使用Rx的时候都不知道 应该如何下手,以此篇文章记录 备忘。

一、RxJava的重要组成

Rx 主要 Observable(可被观察者 —>被观察者) 和 Observer(观察者) ,

然后观察者和被观察者 通过 subscribe 产生关系,

通过 unSubscribe 解除产生关系。

Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。

二、Observable创建

1、使用 Observable.create() 创建:

create() 使用OnSubscribe从头创建一个Observable,这种方法比较简单。需要注意的是,使用该方法创建时,建议在OnSubscribe#call方法中检查订阅状态,以便及时停止发射数据或者运算。

// 1、通過 Observable.create 创建
Observable observable =  Observable.create(
    new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                for (int i = 0; i < 3; i++) {
                    subscriber.onNext("hello Rx");
                }
                subscriber.onCompleted();
            }
        });

// 2、调用subscribe()方法对Observable订阅。订阅以后,我们开始接收Observable          发送的数据(即定义的String)。

observable.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d("--onCompleted --","");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String s) {
                Log.d("--onNext --",s);
            }
        });

注意:
如果 要想保证 观察者成功 获得数据 其接受的类型 ,需要和发送的类型保持一致
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer

2、使用 Observable.from() 创建:

将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable。Future转换为OnSubscribe是通过OnSubscribeToObservableFuture进行的,Iterable转换通过OnSubscribeFromIterable进行。数组通过OnSubscribeFromArray转换。

//2、通过 Observable.from
    private void creatObservable2(){
        List<String> stringList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            stringList.add(""+i);
        }

        Observable observable = Observable.from(stringList);
        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d("123","--->onCompleted");
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d("123","--->onError--"+throwable.toString());
            }

            @Override
            public void onNext(String s) {
                Log.d("123","--->onNext--"+s);
            }
        });

    }

from()创建符可以从一个列表/数组来创建Observable,并一个接一个的从列表/数组中发射出来每一个对象,或者也可以从JavaFuture类来创建Observable,并发射Future对象的.get()方法返回的结果值。传入Future作为参数时,我们可以指定一个超时的值。Observable将等待来自Future的结果;如果在超时之前仍然没有结果回,Observable将会触发onError()方法通知观察者有错误发生了。

3、使用 Observable.just()创建:

Observable.just()方法主要用于将Java对象转变为Observable对象发射出去。

//3、通过 Observable.from
    private void creatObservable3(){
        Observable observable =  Observable.just(1,2,3,4);
        observable.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("123","--->onCompleted--");
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d("123","--->onError--"+throwable.toString());
            }

            @Override
            public void onNext(Integer i) {
                Log.d("123","--->onNext--"+i);
            }
        });

    }
4、empty / error / never:
创建具有非常精确和有限行为的可观察性,这些用于测试目的,有时也用于将其他可观测值和其他可观测值作为参数的操作符组合。

empty :

创建一个什么都不做直接通知完成的Observable

/**
     * empty :创建一个什么都不做直接通知完成的Observable
     */
    private static void empty(){
        Observable emptyObs = Observable.empty();//直接调用onCompleted。
        emptyObs.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted:--->");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:--->");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:--->"+s);
            }
        });
    }

error :

创建一个什么都不做直接通知错误的Observable

/**
     * error : 创建一个什么都不做直接通知错误的Observable
     */
    private static void error(){
        Observable emptyObs = Observable.error(new NullPointerException());//直接调用onError。
        emptyObs.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted:--->");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:--->"+e.toString());//NullPointerException
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:--->"+s);
            }
        });
    }

never:

创建一个什么都不做的Observable

/**
     * never : 创建一个什么都不做的Observable
     */
    private static void never(){
        Observable emptyObs = Observable.never();
        emptyObs.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted:--->");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError:--->"+e.toString());//NullPointerException
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:--->"+s);
            }
        });
    }
5、Interval:

创建一个按照给定的时间间隔发射从0开始的整数序列的Observable,内部通过OnSubscribeTimerPeriodically工作。

/**
     * interval : 使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器:
     */
    private static void intervalTest(){
        /**
         * 参数1 :initialDelay 在发出第一个值之前,等待的初始延迟时间
         * 参数2 :period 发送时间间隔
         * 参数3 :unit 时间单位
         * 参数4 :Scheduler 等待发生和项目的调度程序 ???
         */
       Observable.interval(1,3,TimeUnit.SECONDS, Schedulers.trampoline())//等待 1秒后 ,每隔3秒执行一次
                .subscribe(new Action1<Long>() { // 注意 是Long 类型
            @Override
            public void call(Long l) {
                System.out.println("Action1:--->"+l);
            }
        });
//        try {
//            Thread.sleep(20_000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
    }

注意:在使用 java程序测试 运行时,需要添加Schedulers.trampoline() 调度器,

Observable.interval()不起作用解决方法

6、timer:

创建一个在给定的延时之后发射数据项为0的Observable,内部通过OnSubscribeTimerOnce工作

/**
     * Timer : 创建一个可观察的,在给定的延迟之后发出特定的项目
     */
    private static void timerTest(){
        Observable.timer(2,TimeUnit.SECONDS,Schedulers.trampoline()).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                System.out.println("timerTest--->");
            }
        });
    }
7、range:

会发出一系列顺序整数,按顺序排列,在这里您选择范围的开始和长度

/**
     * Range : 会发出一系列顺序整数,按顺序排列,在这里您选择范围的开始和长度。
     */
    private static void rangeTest(){
        Observable.range(2,9,Schedulers.trampoline())// 从2 开始 每次加1, 输出 9次,
                .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("rangeTest-->"+integer.toString());
            }
        });
    }
8、repeat / repeatWhen:

repeat : 重复操作符会多次发出一个条目。这个操作符的一些实现允许您重复一系列的项目,一些允许您限制重复的次数。

repeatWhen :

/**
     * repeat : 重复操作符会多次发出一个条目。这个操作符的一些实现允许您重复一系列的项目,一些允许您限制重复的次数。
     */
    private static void repeatTest(){
        Observable.range(2,3,Schedulers.trampoline())// 从2 开始 每次加1, 输出 9次,
                .repeat(3)//重复3次 .repeat()接收到.onCompleted()事件后触发重订阅。
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted-->");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onCompleted-->");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext-->"+integer);
                    }
                });



        Observable.range(2,3,Schedulers.trampoline())// 从2 开始 每次加1, 输出 9次,
                .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                    @Override
                    public Observable<?> call(Observable<? extends Void> observable) {

                        return observable.delay(4,TimeUnit.SECONDS,Schedulers.trampoline());
                    }
                },Schedulers.trampoline())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("repeatTest-->"+integer.toString());
                    }
                });
    }

参考文章:
RxJava-简介及Observable创建



https://www.xamrdz.com/mobile/4ys1963382.html

相关文章: