RXJava操作符——转换类

RXJava操作符——转换类

buffer

Observable 的缓存区,源 Observable 发送出来的对象会进过缓存区,当 buffer 里缓存了一定数量的对象,则将它们以 List 的形式发送出去,当源 Observable 发送出 onCompleted 或 onError 通知时,无论当前 buffer 中的对象数量是否足够都会将这个 buffer 发送出去并且传递 onCompleted 或 onError 通知。

flatmap

将源 Observable 对象转换成多个 Observable 对象并将其展开,再将得到的子项发送出去。由于最终将子项合并到到一个 Observable 对象,所以这些子项的顺序可能是相互交错的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.just(user1, user2, user3).flatMap(new Func1<User, Observable<User.Book>>() {
@Override
public Observable<User.Book> call(User user) {
System.out.println(user.getName());
return Observable.from(user.getBooks());
}
}).subscribe(new Observer<User.Book>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(User.Book book) {
System.out.println("bookName:" + book.getBookName() + "->price:" + book.getBokPrice());
}
});

上例中 User 中包含一个 Book 列表,通过 flatmap 操作符可将 User 展开得到 Book 的信息。输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
John
bookName:book1->price:20
bookName:book3->price:35
bookName:book5->price:45
Tim
bookName:book1->price:20
bookName:book2->price:23
bookName:book4->price:43
Amy
bookName:book2->price:23
bookName:book4->price:43
completed

groupBy

将源 Observable 对象发送的子项按自定义的规则进行分类输出。

map

对源 Observable 对象发送的子项进行某个特定操作后继续将其发送出去。

scan

累加器,将源 Observable 发送的第一个对象继续发送,从第二个开始,将前一个发送的结果返回到回调方法中同当前对象一起处理后再发送出去。与之类似的还有一个 reduce 操作符,区别在于 reduce 只输出最终结果,而 scan 会输出过程中得到的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("onNext: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("onError: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});

输出:

1
2
3
4
5
6
Next: 1
Next: 3
Next: 6
Next: 10
Next: 15
Sequence complete.

window

类似于 buffer, 不同的是,window 在发送一个缓存区的时候是以 Observable 的形式发送的,即每发送一次都是一个完整的 Observable,会调用 onCompleted 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable.just(1,2,3,4,5,6).window(2).subscribe(new Observer<Observable<Integer>>() {
@Override
public void onCompleted() {
System.out.println("onComleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Observable<Integer> integerObservable) {
integerObservable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
System.out.println("onError:" + e.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext:" + integer);
}
});
}
});

输出结果:

1
2
3
4
5
6
7
8
9
10
onNext:1
onNext:2
completed
onNext:3
onNext:4
completed
onNext:5
onNext:6
completed
onComleted
呼啦啦...