RXJava操作符——创建类

RXJava 操作符——创建类

create

创建一个 Observable 对象,这个操作符接收一个 Observer 对象,可以通过重写 Observe 接口中的方法自行控制 onNext,onError,onComplete 方法调用。create 不会默认在任何特定的调度器上运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
// 在这里写控制 onNext,onError,onComplete方法的调用逻辑
try {
// 在操作前判断订阅的状态以免 Observable 做无用功
if (!subscriber.isUnsubscribed()) {
for (int i = 0; i < 5; i++) {
System.out.println("emit:" + i);
subscriber.onNext(i);
}
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});

输出结果:

1
2
3
4
5
6
7
8
9
10
11
emit:0
onNext:0
emit:1
onNext:1
emit:2
onNext:2
emit:3
onNext:3
emit:4
onNext:4
completed

在1.2.7版本中,create(Observable.OnSubscribe f) 方法被废弃,改用 unsafeCreate(Observable.OnSubscribe f),二者源码实现一致。create 还有一些重载方法可以使用。

defer

延迟执行某个操作符直到 Observable 被订阅,并且为每一个 Observe 对象创建一个新的 Observable,这样能保证 Observer 能接收到完整的数据。

使用场景,参考:Deferring Observable code until subscription in RxJava

empty/never/error

这三个操作符都不会发送任何对象,多用于测试

  • empty:创建一个不发送任何对象的 Observable 并且立即调用 Observe 的 onCompleted() 方法;
  • never:创建一个就发送任何对象也不发送任何通知的 Observable 对象;
  • error:创建一个不发送任何对象的 Observable 对象,当 Observable 被订阅时调用 Observe 的 onError() 方法。

from

将传入的数组或 Iterable 对象转换成依次发送其中子项的 Observable 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
String[] args = {"tim", "john", "sdf"};
Observable.from(args).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
// from 将传入的 String 数组拆分成 String 单个发送
System.out.println("onNext:");
System.out.println(s);
}
});

输出结果:

1
2
3
4
5
6
7
onNext:
tim
onNext:
john
onNext:
sdf
completed

just

创建一个 Observable 对象,将传入的参数依次发送,最多可接收 10 个参数。

该操作符与 from 类似,不同的是 from 将传入的数组拆分成具体的子项一次发送一个,而 just 则是将传入的数组当成一个对象一次发送出去。just 可传入 null,此时 Observable 对象将 null 作为一个对象发送出去而不是产生一个不发送对象的 Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
String[] args = {"tim", "john", "sdf"};
Observable.just(args).subscribe(new Observer<String[]>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.getMessage());
}
@Override
public void onNext(String[] s) {
// just 将传入的 String 数组作为一个对象发送
System.out.println("onNext:");
for (String arg : s) {
System.out.println(arg);
}
}
});

输出结果可以看到,from 操作符执行了 3 次 onNext,而 just 操作符只执行一次 onNext:

1
2
3
4
5
onNext:
tim
john
sdf
completed

interval

创建一个 Observable 对象,以传入参数为间隔,发送从 0 开始递增 1 的 long 类型的无穷序列。默认在 computation 调度器中运行,可自定义调度器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("onNext:" + aLong);
}
});

由于调度器运行在守护线程,如果只在 main 方法中调用,需要让主线程 sleep 才能看到输出,以下为 sleep 10s 的结果:

1
2
3
4
5
onNext:0
onNext:1
onNext:2
onNext:3
onNext:4

range

interval 操作符类似,range(n, m) 创建一个 Observable 对象,发送从 n 开始,长度为 m 的序列。区别在于 range 发送的是 Integer 类型的整数,且默认不在任何特定调度器中运行,但可指定其运行的调度器。

若 m 为 0,则不发送对象并调用 onCompleted,若 m 为负数,则抛出异常,调用 onError。

repeat

该操作符不能创建一个 Observable 对象,可以重复发送上游 Observable 对象的 item,默认运行在 trampoline 调度器中,可指定其运行的调度器及重复次数。

timer

在指定时间后,返回一个发送 0L 对象的 Observable,随后调用 completed 方法。

呼啦啦...