RXJava 操作符——创建类
创建一个 Observable
对象,这个操作符接收一个 Observer
对象,可以通过重写 Observe 接口中的方法自行控制 onNext
,onError
,onComplete
方法调用。create
不会默认在任何特定的调度器上运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { try { if (!subscriber.isUnsubscribed()) { for (int i = 0; i < 5; i++) { System.out.println("emit:" + i); subscriber.onNext(i); } subscriber.onCompleted(); } } catch (Exception e) { subscriber.onError(e); } } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); }
@Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } });
|
输出结果:
1 2 3 4 5 6 7 8 9 10 11
| emit:0 onNext:0 emit:1 onNext:1 emit:2 onNext:2 emit:3 onNext:3 emit:4 onNext:4 completed
|
在1.2.7版本中,create(Observable.OnSubscribe f) 方法被废弃,改用 unsafeCreate(Observable.OnSubscribe f),二者源码实现一致。create
还有一些重载方法可以使用。
延迟执行某个操作符直到 Observable 被订阅,并且为每一个 Observe 对象创建一个新的 Observable,这样能保证 Observer 能接收到完整的数据。
使用场景,参考:Deferring Observable code until subscription in RxJava
这三个操作符都不会发送任何对象,多用于测试
- empty:创建一个不发送任何对象的 Observable 并且立即调用 Observe 的
onCompleted()
方法;
- never:创建一个就发送任何对象也不发送任何通知的 Observable 对象;
- error:创建一个不发送任何对象的 Observable 对象,当 Observable 被订阅时调用 Observe 的
onError()
方法。
将传入的数组或 Iterable
对象转换成依次发送其中子项的 Observable 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| String[] args = {"tim", "john", "sdf"}; Observable.from(args).subscribe(new Observer<String>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable e) { System.out.println("onError:" + e.getMessage()); }
@Override public void onNext(String s) { System.out.println("onNext:"); System.out.println(s); } });
|
输出结果:
1 2 3 4 5 6 7
| onNext: tim onNext: john onNext: sdf completed
|
创建一个 Observable 对象,将传入的参数依次发送,最多可接收 10 个参数。
该操作符与 from 类似,不同的是 from 将传入的数组拆分成具体的子项一次发送一个,而 just 则是将传入的数组当成一个对象一次发送出去。just 可传入 null,此时 Observable 对象将 null
作为一个对象发送出去而不是产生一个不发送对象的 Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| String[] args = {"tim", "john", "sdf"}; Observable.just(args).subscribe(new Observer<String[]>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable e) { System.out.println("onError:" + e.getMessage());
}
@Override public void onNext(String[] s) { System.out.println("onNext:"); for (String arg : s) { System.out.println(arg); } } });
|
输出结果可以看到,from
操作符执行了 3 次 onNext,而 just
操作符只执行一次 onNext:
1 2 3 4 5
| onNext: tim john sdf completed
|
创建一个 Observable 对象,以传入参数为间隔,发送从 0 开始递增 1 的 long 类型的无穷序列。默认在 computation 调度器中运行,可自定义调度器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Observable.interval(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onCompleted() { System.out.println("completed"); }
@Override public void onError(Throwable e) { System.out.println("onError:" + e.getMessage());
}
@Override public void onNext(Long aLong) { System.out.println("onNext:" + aLong); } });
|
由于调度器运行在守护线程,如果只在 main 方法中调用,需要让主线程 sleep 才能看到输出,以下为 sleep 10s 的结果:
1 2 3 4 5
| onNext:0 onNext:1 onNext:2 onNext:3 onNext:4
|
与 interval
操作符类似,range(n, m) 创建一个 Observable 对象,发送从 n 开始,长度为 m 的序列。区别在于 range
发送的是 Integer 类型的整数,且默认不在任何特定调度器中运行,但可指定其运行的调度器。
若 m 为 0,则不发送对象并调用 onCompleted,若 m 为负数,则抛出异常,调用 onError。
该操作符不能创建一个 Observable 对象,可以重复发送上游 Observable 对象的 item,默认运行在 trampoline 调度器中,可指定其运行的调度器及重复次数。
在指定时间后,返回一个发送 0L 对象的 Observable,随后调用 completed 方法。