0%

RXJava笔记——相关术语

RXJava相关术语

Upstream(上游),DownStream(下游)

RXJava 中的数据流由数据源和 0 或多个连续的立即执行的操作或组合操作组成:

1
2
3
source.operator1().operator2().operator3().subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

如上,如当前处于operation2,则左侧为upstream,右侧为downstream。

运动中的对象

在 RXJava 中,emission, emits, item, event, signal, data 和 message 都表示沿着流传播的对象。

Backpressure(背压)

当数据流运行在多个线程中,数据流中每一个操作处理的速度可能会不同,如果上游处理速度比下游快,则会造成堵塞。通常表现为由与暂时缓存或需要跳过、抛弃部分数据而增加内存使用。这时可以使用背压策略解决,即当前操作可以告诉上游它可以处理多少事件,在无法预知上游会发送多少事件过来的情况下可以控制内存的使用。

在 RXJava 中,Flowable 专为背压设计并且 Observable 专为无背压操作设计。

Assembly time

通过各种操作符来准备数据流的时间,在 subscribe() 发生之前。

1
2
3
4
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v* v)
.filter(v -> v % 3 == 0)
;

在这种状态下没有副作用发生。

Subscription time

subscribe() 被调用时所处的状态,是一种临时状态,此时会内部建立有各种操作符组成的操作链。产生订阅副作用,在这种状态下部分数据源会立即阻断或开始发送事件。

Runtime

流主动发送事件、错误或已完成信号时的状态。几乎是整个操作过程执行的时间。

简单的后台计算

RXJava 采用流式 API,类似构建器模式,不同的是 RXJava 返回的类型是不变的。通过 subscribeOn 将耗时操作转移到其他线程,当数据准备好时,通过 observeOn 转移到 UI 线程显示。

Schedulers(调度器)

RXJava 使用 Schedulers (调度器)处理并发问题,RXJava 中,Schedulers 运行在守护线程中,意味着只要 main 方法存在,它们都将停止并且后台任务可能永远不会执行。

  • Schedulers.computation():在后台固定数量的专用线程中处理密集型计算。大部分异步操作符的默认调度器。
  • Schedulers.io():在一组动态变换的线程中运行类 I/O 或易阻塞的操作。
  • Schedulers.single():以 FIFO 的方式在单一线程中运行操作
  • Schedulers.trampoline():以 FIFO 的方式在当前线程中运行操,常用于测试

以上四种调度器在所有 JVM 中都可用,某些特定平台也有自己的调度器,如 Android 的 AndroidSchedulers.mainThread()

流中的并发

流的本质是顺序的,但是可分为不同的处理过程,而且相互之间可能存在并发

1
2
3
4
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);

以上示例中,在 computation scheduler 中从 1 到 10 计算其平方,在调用 blockingSubscribe 的线程中输出结果。其中 map(v -> v * v) 并不是并行的,而是在同一个 computation 线程中顺序执行。

并行处理

1
2
3
4
5
6
7
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);

RXJava 中的并行意味着在相互独立的流中执行操作并将结果合并到一个流中。操作符 flatMap 实现了并行,但是不能保证最终合并的结果是按顺序输出的,即内部流的顺序可能会相互交错。替代操作符:

  • concatMap:一次运行一个内部流
  • concatMapEager:立即运行所有内部流,按照内部流创建的顺序输出