Reactor - 核心特征
# Reactor 核心特性
Reactor 项目的主要 artifact 是 reactor-core
,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。
Reactor 引入了实现 Publisher
的响应式类 Flux
和 Mono
,以及丰富的操作方式。 一个 Flux
对象代表一个包含 0..N 个元素的响应式序列,而一个 Mono
对象代表一个包含 零/一个(0..1)元素的结果。
这种区别为这俩类型带来了语义上的信息——表明了异步处理逻辑所面对的元素基数。比如, 一个 HTTP 请求产生一个响应,所以对其进行 count
操作是没有多大意义的。表示这样一个 结果的话,应该用 Mono<HttpResponse>
而不是 Flux<HttpResponse>
,因为要置于其上的 操作通常只用于处理 0/1 个元素。
有些操作可以改变基数,从而需要切换类型。比如,count
操作用于 Flux
,但是操作 返回的结果是 Mono<Long>
。
# Flux 包含 0-N 个元素的异步序列
Flux<T>
是一个能够发出 0 到 N 个元素的标准的 Publisher<T>
,它会被一个「错误(error)」 或「完成(completion)」信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNext
,onComplete
和onError
方法。
由于多种不同的信号可能性,Flux
可以作为一种通用的响应式类型。注意,所有的信号事件, 包括代表终止的信号事件都是可选的:如果没有 onNext
事件但是有一个 onComplete
事件, 那么发出的就是 空的 有限序列,但是去掉 onComplete
那么得到的就是一个 无限的 空序列。 当然,无限序列也可以不是空序列,比如,Flux.interval(Duration)
生成的是一个 Flux<Long>
, 这就是一个无限地周期性发出规律 tick 的时钟序列。
# Mono 异步的 0-1 结果
Mono<T>
是一种特殊的 Publisher<T>
, 它最多发出一个元素,然后终止于一个 onComplete
信号或一个 onError
信号。
它只适用其中一部分可用于 Flux
的操作。比如,(两个 Mono
的)结合类操作可以忽略其中之一 而发出另一个 Mono
,也可以将两个都发出,对于后一种情况会切换为一个 Flux
。
例如,Mono#concatWith(Publisher)
返回一个 Flux
,而 Mono#then(Mono)
返回另一个 Mono
。
注意,Mono
可以用于表示「空」的只有完成概念的异步处理(比如 Runnable
)。这种用 Mono<Void>
来创建。
# 简单的创建和订阅 Flux 或 Mono 的方法
最简单的上手 Flux
和 Mono
的方式就是使用相应类提供的多种工厂方法之一。
比如,如果要创建一个 String
的序列,你可以直接列举它们,或者将它们放到一个集合里然后用来创建 Flux,如下:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
2
3
4
工厂方法的其他例子如下:
Mono<String> noData = Mono.empty(); // 1
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); // 2
2
3
4
5
- 注意,即使没有值,工厂方法仍然采用通用的返回类型
- 第一个参数是 range 的开始,第二个参数是要生成的元素个数
在订阅(subscribe)的时候,Flux
和 Mono
使用 Java 8 lambda 表达式。 .subscribe()
方法有多种不同的方法签名,你可以传入各种不同的 lambda 形式的参数来定义回调。如下所示:
基于 lambda 的对
Flux
的订阅(subscribe)
subscribe(); // 1
subscribe(Consumer<? super T> consumer); // 2
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); // 3
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); // 4
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); // 5
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 订阅并触发序列
- 对每一个生成的元素进行消费
- 对正常元素进行消费,也对错误进行响应
- 对正常元素和错误均有响应,还定义了序列正常完成后的回调
- 对正常元素、错误和完成信号均有响应,同时也定义了对该
subscribe
方法返回的Subscription
执行的回调。
以上方法会返回一个 Subscription
的引用,如果不再需要更多元素你可以通过它来取消订阅。 取消订阅时,源头会停止生成新的数据,并清理相关资源。取消和清理的操作在 Reactor 中是在 接口 Disposable
中定义的。
# subscribe 方法示例
下面对 subscribe
的 5 个不同签名的方法的示例,如下是一个无参的基本方法的使用:
Flux<Integer> ints = Flux.range(1, 3); // 1
ints.subscribe(); // 2
2
- 配置一个在订阅时会产生3个值的
Flux
- 最简单的订阅方式
第二行代码没有任何输出,但是它确实执行了。Flux
产生了 3 个值。如果我们传入一个 lambda,我们就可以看到这几个值,如下一个列子:
Flux<Integer> ints = Flux.range(1, 3); // 1
ints.subscribe(i -> System.out.println(i)); // 2
2
- 配置一个在订阅时会产生3个值的
Flux
- 订阅它并打印值
第二行代码会输入如下内容:
1
2
3
2
3
为了演示下一个方法签名,我们故意引入一个错误,如下所示:
Flux<Integer> ints = Flux.range(1, 4) // 1
.map(i -> { // 2
if (i <= 3) return i; // 3
throw new RuntimeException("Got to 4"); // 4
});
ints.subscribe(i -> System.out.println(i), // 5
error -> System.err.println("Error: " + error));
2
3
4
5
6
7
- 配置一个在订阅时会产生4个值的
Flux
- 为了对元素进行处理,我们需要一个 map 操作
- 对于多数元素,返回值本身
- 对其中一个元素抛出错误
- 订阅的时候定义如何进行错误处理
现在我们有两个 lambda 表达式:一个是用来处理正常数据,一个用来处理错误。 刚才的代码输出如下:
1
2
3
Error: java.lang.RuntimeException: Got to 4
2
3
4
下一个 subscribe
方法的签名既有错误处理,还有一个完成后的处理,如下:
Flux<Integer> ints = Flux.range(1, 4); // 1
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> {System.out.println("Done");}); // 2
2
3
4
- 配置一个在订阅时会产生4个值的
Flux
- 订阅时定义错误和完成信号的处理
错误和完成信号都是终止信号,并且二者只会出现其中之一。为了能够最终全部正常完成,你必须处理错误信号。
用于处理完成信号的 lambda 是一对空的括号,因为它实际上匹配的是 Runnalbe
接口中的 run
方法, 不接受参数。刚才的代码输出如下:
1
2
3
4
Done
2
3
4
5
最后一个 subscribe
方法签名包含一个自定义的 subscriber
(下一节会介绍到):
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> {System.out.println("Done");},
s -> ss.request(10));
ints.subscribe(ss);
2
3
4
5
6
7
上面这个例子中,我们把一个自定义的 Subscriber
作为 subscribe
方法的最后一个参数。 下边的例子是这个自定义的 Subscriber
,这是一个对 Subscriber
的最简单实现:
package io.projectreactor.samples;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class SampleSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SampleSubscriber
类继承自 BaseSubscriber
,在 Reactor 中, 推荐用户扩展它来实现自定义的 Subscriber
。这个类提供了一些 hook 方法,我们可以通过重写它们来调整 subscriber 的行为。 默认情况下,它会触发一个无限个数的请求,但是当你想自定义请求元素的个数的时候,扩展 BaseSubscriber
就很方便了。
扩展的时候通常至少要覆盖 hookOnSubscribe(Subscription subscription)
和 hookOnNext(T value)
这两个方法。这个例子中, hookOnSubscribe
方法打印一段话到标准输出,然后进行第一次请求。 然后 hookOnNext
同样进行了打印,同时逐个处理剩余请求。
SampleSubscriber
输出如下:
Subscribed
1
2
3
4
2
3
4
5
建议你同时重写
hookOnError
、hookOnCancel
,以及hookOnComplete
方法。 你最好也重写hookFinally
方法。SampleSubscribe
确实是一个最简单的实现了 请求有限个数元素的Subscriber
。
后边还会再讨论
BaseSubscriber
。
响应式流规范定义了另一个 subscribe
方法的签名,它只接收一个自定义的 Subscriber
, 没有其他的参数,如下所示:
subscribe(Subscriber<? super T> subscriber);
如果你已经有一个 Subscriber
,那么这个方法签名还是挺有用的。况且,你可能还会用到它 来做一些订阅相关(subscription-related)的回调。比如,你想要自定义「背压(backpressure)」 并且自己来触发请求。
在这种情况下,使用 BaseSubscriber
抽象类就很方便,因为它提供了很好的配置「背压」 的方法。
使用 BaseSubscriber
来配置「背压」
Flux<String> source = someStringSource();
source.map(String::toUpperCase)
.subscribe(new BaseSubscriber<String>() { // 1
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 2
request(1); // 3
}
@Override
protected void hookOnNext(String value) {
request(1); // 4
}
// 5
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
BaseSubscriber
是一个抽象类,所以我们创建一个匿名内部类BaseSubscriber
定义了多种用于处理不同信号的 hook。它还定义了一些捕获Subscription
对象的现成方法,这些方法可以用在 hook 中request(n)
就是这样一个方法。它能够在任何 hook 中,通过 subscription 向上游传递 背压请求。这里我们在开始这个流的时候请求1个元素值- 随着接收到新的值,我们继续以每次请求一个元素的节奏从源头请求值
- 其他 hooks 有
hookOnComplete
,hookOnError
,hookOnCancel
, andhookFinally
(它会在流终止的时候被调用,传入一个SignalType
作为参数)
当你修改请求操作的时候,你必须注意让 subscriber 向上提出足够的需求, 否则上游的 Flux 可能会被「卡住」。所以
BaseSubscriber
在进行扩展的时候要覆盖hookOnSubscribe
和onNext
,这样你至少会调用request
一次。
BaseSubscriber
还提供了 requestUnbounded()
方法来切换到「无限」模式(等同于 request(Long.MAX_VALUE)
)。
# 可编程式地创建一个序列
在这一小节,我们介绍如何通过定义相对应的事件(onNext
、onError
和onComplete
) 创建一个 Flux
或 Mono
。所有这些方法都通过 API 来触发我们叫做 sink(池) 的事件。 sink 的类型不多,我们快速过一下。
# Generate
最简单的创建 Flux
的方式就是使用 generate
方法。
这是一种 同步地, 逐个地 产生值的方法,意味着 sink 是一个 SynchronousSink
而且其 next()
方法在每次回调的时候最多只能被调用一次。你也可以调用 error(Throwable)
或者 complete()
,不过是可选的。
最有用的一种方式就是同时能够记录一个状态值(state),从而在使用 sink 发出下一个元素的时候能够 基于这个状态值去产生元素。此时生成器(generator)方法就是一个 BiFunction<S, SynchronousSink<T>, S>
, 其中 <S>
是状态对象的类型。你需要提供一个 Supplier<S>
来初始化状态值,而生成器需要 在每一「回合」生成元素后返回新的状态值(供下一回合使用)。
例如我们使用一个 int
作为状态值。
基于状态值的 generate
示例
Flux<String> flux = Flux.generate(
() -> 0, // 1
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); // 2
if (state == 10) sink.complete(); // 3
return state + 1; // 4
});
2
3
4
5
6
7
- 初始化状态值(state)为 0
- 我们基于状态值 state 来生成下一个值(state 乘以 3)
- 我们也可以用状态值来决定什么时候终止序列
- 返回一个新的状态值 state,用于下一次调用
上面的代码生成了「3 x」的乘法表:
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
2
3
4
5
6
7
8
9
10
11
我们也可以使用可变(mutable)类型(如上例,原生类型及其包装类,以及 String 等属于不可变类型) 的 <S>
。上边的例子也可以用 AtomicLong
作为状态值,在每次生成后改变它的值。
可变类型的状态变量
Flux<String> flux = Flux.generate(
AtomicLong::new, // 1
(state, sink) -> {
long i = state.getAndIncrement(); // 2
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state; // 3
});
2
3
4
5
6
7
8
- 这次我们初始化一个可变类型的状态值
- 改变状态值
- 返回 同一个 实例作为新的状态值
如果状态对象需要清理资源,可以使用 generate(Supplier<S>, BiFunction, Consumer<S>)
这个签名方法来清理状态对象(Consumer 在序列终止才被调用)。
下面是一个在 generate 方法中增加 Consumer
的例子:
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> { // 1
long i = state.getAndIncrement(); // 2
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state; // 3
}, (state) -> System.out.println("state: " + state)); // 4
}
2
3
4
5
6
7
8
9
- 同样,初始化一个可变对象作为状态变量
- 改变状态
- 返回 同一个 实例作为新的状态
- 我们会看到最后一个状态值(11)会被这个
Consumer
lambda 输出
如果 state 使用了数据库连接或者其他需要最终进行清理的资源,这个 Consumer
lambda 可以用来在最后关闭连接或完成相关的其他清理任务。
# Create
作为一个更高级的创建 Flux
的方式, create
方法的生成方式既可以是同步, 也可以是异步的,并且还可以每次发出多个元素。
该方法用到了 FluxSink
,后者同样提供 next
,error
和 complete
等方法。 与 generate
不同的是,create
不需要状态值,另一方面,它可以在回调中触发 多个事件(即使是在未来的某个时间)。
create
有个好处就是可以将现有的 API 转为响应式,比如监听器的异步方法。
假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。如下:
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}
2
3
4
你可以使用 create
方法将其转化为响应式类型 Flux<T>
:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register( // 4
new MyEventListener<String>() { // 1
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); // 2
}
}
public void processComplete() {
sink.complete(); // 3
}
});
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 桥接
MyEventListener
- 每一个 chunk 的数据转化为
Flux
中的一个元素 processComplete
事件转换为onComplete
- 所有这些都是在
myEventProcessor
执行时异步执行的
此外,既然 create
可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy
来定义背压行为。
IGNORE
: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致IllegalStateException
ERROR
: 当下游跟不上节奏的时候发出一个IllegalStateException
的错误信号DROP
:当下游没有准备好接收新的元素的时候抛弃这个元素LATEST
:让下游只得到上游最新的元素BUFFER
:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致OutOfMemoryError
)
Mono
也有一个用于create
的生成器(generator)——MonoSink
,它不能生成多个元素, 因此会抛弃第一个元素之后的所有元素。
# 推送(push)模式
create
的一个变体是 push
,适合生成事件流。与 create
类似,push
也可以是异步地, 并且能够使用以上各种溢出策略(overflow strategies)管理背压。每次只有一个生成线程可以调用 next
,complete
或 error
。
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() { // 1
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s); // 2
}
}
public void processComplete() {
sink.complete(); // 3
}
public void processError(Throwable e) {
sink.error(e); // 4
}
});
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 桥接
SingleThreadEventListener
API - 在监听器所在线程中,事件通过调用
next
被推送到 sink complete
事件也在同一个线程中error
事件也在同一个线程中
# 推送/拉取(push/pull)混合模式
不像 push
,create
可以用于 push
或 pull
模式,因此适合桥接监听器的 的 API,因为事件消息会随时异步地到来。回调方法 onRequest
可以被注册到 FluxSink
以便跟踪请求。这个回调可以被用于从源头请求更多数据,或者通过在下游请求到来 的时候传递数据给 sink 以实现背压管理。这是一种推送/拉取混合的模式, 因为下游可以从上游拉取已经就绪的数据,上游也可以在数据就绪的时候将其推送到下游。
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {
public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s); // 3
}
}
});
sink.onRequest(n -> {
List<String> messages = myMessageProcessor.request(n); // 1
for(String s : message) {
sink.next(s); // 2
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- 当有请求的时候取出一个 message
- 如果有就绪的 message,就发送到 sink
- 后续异步到达的 message 也会被发送给 sink
# 清理(Cleaning up)
onDispose
和 onCancel
这两个回调用于在被取消和终止后进行清理工作。 onDispose
可用于在 Flux
完成,有错误出现或被取消的时候执行清理。 onCancel
只用于针对「取消」信号执行相关操作,会先于 onDispose
执行。
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel()) // 1
.onDispose(() -> channel.close()) // 2
});
2
3
4
5
onCancel
在取消时被调用。onDispose
在有完成、错误和取消时被调用
# Handle
handle
方法有些不同,它在 Mono
和 Flux
中都有。然而,它是一个实例方法 (instance method),意思就是它要链接在一个现有的源后使用(与其他操作符一样)。
它与 generate
比较类似,因为它也使用 SynchronousSink
,并且只允许元素逐个发出。 然而,handle
可被用于基于现有数据源中的元素生成任意值,有可能还会跳过一些元素。 这样,可以把它当做 map
与 filter
的组合。handle
方法签名如下:
handle(BiConsumer<T, SynchronousSink<R>>)
举个例子,响应式流规范允许 null
这样的值出现在序列中。假如你想执行一个类似 map
的操作,你想利用一个现有的具有映射功能的方法,但是它会返回 null,这时候怎么办呢?
例如,下边的方法可以用于 Integer 序列,映射为字母或 null 。
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}
2
3
4
5
6
7
我们可以使用 handle
来去掉其中的 null。
将 handle
用于一个 "映射 + 过滤 null" 的场景
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i); // 1
if (letter != null) // 2
sink.next(letter); // 3
});
alphabet.subscribe(System.out::println);
2
3
4
5
6
7
8
- 映射到字母。
- 如果返回的是 null
- 就不会调用
sink.next
从而过滤掉
输出如下:
M
I
T
2
3
# 调度器(Schedulers)
Reactor, 就像 RxJava,也可以被认为是 并发无关(concurrency agnostic) 的。意思就是, 它并不强制要求任何并发模型。更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库。
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler
。 Scheduler
(opens new window) 是一个拥有广泛实现类的抽象接口。 Schedulers
(opens new window) 类提供的静态方法用于达成如下的执行环境:
- 当前线程(
Schedulers.immediate()
) - 可重用的单线程(
Schedulers.single()
)。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用Schedulers.newSingle()
- 弹性线程池(
Schedulers.elastic()
。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()
能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源,见 如何包装一个同步阻塞的调用? - 固定大小线程池(
Schedulers.parallel()
)。所创建线程池的大小与 CPU 个数等同
此外,你还可以使用 Schedulers.fromExecutorService(ExecutorService)
基于现有的 ExecutorService
创建 Scheduler
。(虽然不太建议,不过你也可以使用 Executor
来创建)。你也可以使用 newXXX
方法来创建不同的调度器。比如 Schedulers.newElastic(yourScheduleName)
创建一个新的名为 yourScheduleName
的弹性调度器。
操作符基于非阻塞算法实现,从而可以利用到某些调度器的工作窃取(work stealing) 特性的好处。
一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器)例如, 通过工厂方法 Flux.interval(Duration.ofMillis(300))
生成的每 300ms 打点一次的 Flux<Long>
, 默认情况下使用的是 Schedulers.parallel()
,下边的代码演示了如何将其装换为 Schedulers.single()
:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Reactor 提供了两种在响应式链中调整调度器 Scheduler
的方法:publishOn
和 subscribeOn
。 它们都接受一个 Scheduler
作为参数,从而可以改变调度器。但是 publishOn
在链中出现的位置 是有讲究的,而 subscribeOn
则无所谓。要理解它们的不同,你首先要理解 subscribe() 之前什么都不会发生。
在 Reactor 中,当你在操作链上添加操作符的时候,你可以根据需要在 Flux
和 Mono
的实现中包装其他的 Flux
和 Mono
。一旦你订阅(subscribe)了它,一个 Subscriber
的链 就被创建了,一直向上到第一个 publisher 。这些对开发者是不可见的,开发者所能看到的是最外一层的 Flux
(或 Mono
)和 Subscription
,但是具体的任务是在中间这些跟操作符相关的 subscriber 上处理的。
基于此,我们仔细研究一下 publishOn
和 subscribeOn
这两个操作符:
publishOn
的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器Scheduler
的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个publishOn
出现在这个链上)。subscribeOn
用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把subscribeOn
至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的publishOn
,后者仍能够切换其后操作符的线程执行环境。
只有操作链中最早的
subscribeOn
调用才算数。
# 线程模型
Flux
和 Mono
不会创建线程。一些操作符,比如 publishOn
,会创建线程。同时,作为一种任务共享形式, 这些操作符可能会从其他任务池(work pool)——如果其他任务池是空闲的话——那里「偷」线程。因此, 无论是 Flux
、Mono
还是 Subscriber
都应该精于线程处理。它们依赖这些操作符来管理线程和任务池。
publishOn
强制下一个操作符(很可能包括下一个的下一个…)来运行在一个不同的线程上。 类似的,subscribeOn
强制上一个操作符(很可能包括上一个的上一个…)来运行在一个不同的线程上。 记住,在你订阅(subscribe)前,你只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦你订阅了,整个流程就开始工作了。
下边的例子演示了支持任务共享的多线程模型:
Flux.range(1, 10000) // 1
.publishOn(Schedulers.parallel()) // 2
.subscribe(result) // 3
2
3
- 创建一个有 10,000 个元素的
Flux
- 创建等同于 CPU 个数的线程(最小为4)
subscribe()
之前什么都不会发生
Scheduler.parallel()
创建一个基于单线程 ExecutorService
的固定大小的任务线程池。 因为可能会有一个或两个线程导致问题,它总是至少创建 4 个线程。然后 publishOn 方法便共享了这些任务线程, 当 publishOn
请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样, 就是进行了任务共享(一种资源共享方式)。Reactor 还提供了好几种共享资源的方式,请参考 Schedulers (opens new window)。
Scheduler.elastic()
也能创建线程,它能够很方便地创建专门的线程(以便跑一些可能会阻塞资源的任务, 比如一个同步服务),请见 如何包装一个同步阻塞的调用?。
内部机制保证了这些操作符能够借助自增计数器(incremental counters)和警戒条件(guard conditions) 以线程安全的方式工作。例如,如果我们有四个线程处理一个流(就像上边的例子),每一个请求会让计数器自增, 这样后续的来自不同线程的请求就能拿到正确的元素。
# 处理错误
如果想了解有哪些可用于错误处理的操作符,请参考 错误处理。
在响应式流中,错误(error)是终止(terminal)事件。当有错误发生时,它会导致流序列停止, 并且错误信号会沿着操作链条向下传递,直至遇到你定义的 Subscriber
及其 onError
方法。
这样的错误还是应该在应用层面解决的。比如,你可能会将错误信息显示在用户界面,或者通过某个 REST 端点(endpoint)发出。因此,订阅者(subscriber)的 onError
方法是应该定义的。
如果没有定义,
onError
会抛出UnsupportedOperationException
。你可以接下来再 检测错误,并通过Exceptions.isErrorCallbackNotImplemented
方法捕获和处理它。
Reactor 还提供了其他的用于在链中处理错误的方法,即 错误处理操作(error-handling operators)。
在你了解错误处理操作符之前,你必须牢记 响应式流中的任何错误都是一个终止事件。 即使用了错误处理操作符,也不会让源头流序列继续。而是将
onError
信号转化为一个 新的 序列 的开始。换句话说,它代替了被终结的 上游 流序列。
现在我们来逐个看看错误处理的方法。需要的时候我们会同时用到命令式编程风格的 try
代码块来作比较。
# 错误处理方法
你也许熟悉在 try-catch 代码块中处理异常的几种方法。常见的包括如下几种:
- 捕获并返回一个静态的缺省值。
- 捕获并执行一个异常处理方法。
- 捕获并动态计算一个候补值来顶替。
- 捕获,并再包装为某一个
业务相关的异常
,然后再抛出业务异常。 - 捕获,记录错误日志,然后继续抛出。
- 使用
finally
来清理资源,或使用 Java 7 引入的 "try-with-resource"。
以上所有这些在 Reactor 都有相应的基于 error-handling 操作符处理方式。
在开始研究这些操作符之前,我们先准备好响应式链(reactive chain)方式和 try-catch 代码块方式(以便对比)。
当订阅的时候,位于链结尾的 onError
回调方法和 catch
块类似,一旦有异常,执行过程会跳入到 catch:
Flux<String> s = Flux.range(1, 10)
.map(v -> doSomethingDangerous(v)) // 1
.map(v -> doSecondTransform(v)); // 2
s.subscribe(value -> System.out.println("RECEIVED " + value), // 3
error -> System.err.println("CAUGHT " + error) // 4
);
2
3
4
5
6
- 执行 map 转换,有可能抛出异常
- 如果没问题,执行第二个 map 转换操作
- 所有转换成功的值都打印出来
- 一旦有错误,序列(sequence)终止,并打印错误信息
这与 try/catch 代码块是类似的:
try {
for (int i = 1; i < 11; i++) {
String v1 = doSomethingDangerous(i); // 1
String v2 = doSecondTransform(v1); // 2
System.out.println("RECEIVED " + v2);
}
} catch (Throwable t) {
System.err.println("CAUGHT " + t); // 3
}
2
3
4
5
6
7
8
9
- 如果这里抛出异常
- 后续的代码跳过
- 执行过程直接到这
既然我们准备了两种方式做对比,我们就来看一下不同的错误处理场景,以及相应的操作符。
# 静态缺省值
与第 (1) 条(捕获并返回一个静态的缺省值)对应的是 onErrorReturn
:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
2
3
你还可以通过判断错误信息的内容,来筛选哪些要给出缺省值,哪些仍然让错误继续传递下去:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
2
3
# 异常处理方法
如果你不只是想要在发生错误的时候给出缺省值,而是希望提供一种更安全的处理数据的方式, 可以使用 onErrorResume
。这与第 (2) 条(捕获并执行一个异常处理方法)类似。
假设,你会尝试从一个外部的不稳定服务获取数据,但仍然会在本地缓存一份 可能 有些过期的数据, 因为缓存的读取更加可靠。可以这样来做:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k)) // 1
.onErrorResume(e -> getFromCache(k)); // 2
2
3
- 对于每一个 key, 异步地调用一个外部服务
- 如果对外部服务的调用失败,则再去缓存中查找该 key。注意,这里无论
e
是什么,都会执行异常处理方法
就像 onErrorReturn
,onErrorResume
也有可以用于预先过滤错误内容的方法变体,可以基于异常类或 Predicate
进行过滤。它实际上是用一个 Function
来作为参数,还可以返回一个新的流序列。
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k))
.onErrorResume(error -> { // 1
if (error instanceof TimeoutException) // 2
return getFromCache(k);
else if (error instanceof UnknownKeyException) // 3
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error); // 4
});
2
3
4
5
6
7
8
9
10
- 这个函数式允许开发者自行决定如何处理
- 如果源超时,使用本地缓存
- 如果源找不到对应的 key,创建一个新的实体
- 否则, 将问题「重新抛出」
# 动态候补值
有时候并不想提供一个错误处理方法,而是想在接收到错误的时候计算一个候补的值。这类似于第 (3) 条(捕获并动态计算一个候补值)。
例如,如果你的返回类型本身就有可能包装有异常(比如 Future.complete(T success)
vs Future.completeExceptionally(Throwable error)
),你有可能使用流中的错误包装起来实例化 返回值。
这也可以使用上一种错误处理方法的方式(使用 onErrorResume
)解决,代码如下:
erroringFlux.onErrorResume(error -> Mono.just( // 1
myWrapper.fromError(error) // 2
));
2
3
- 在
onErrorResume
中,使用Mono.just
创建一个Mono
- 将异常包装到另一个类中
# 捕获并重新抛出
在「错误处理方法」的例子中,基于 flatMap
方法的最后一行,我们可以猜到如何做到第 (4) 条(捕获,包装到一个业务相关的异常,然后抛出业务异常):
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original)
);
2
3
4
5
然而还有一个更加直接的方法—— onErrorMap
:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
2
3
# 记录错误日志
如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用 doOnError
方法。这对应第 (5) 条(捕获,记录错误日志,并继续抛出)。 这个方法与其他以 doOn
开头的方法一样,只起副作用("side-effect")。它们对序列都是只读, 而不会带来任何改动。
如下边的例子所示,我们会记录错误日志,并且还通过变量自增统计错误发生个数。
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)) // 1
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k); // 2
})
.onErrorResume(e -> getFromCache(k)); // 3
2
3
4
5
6
7
8
9
- 对外部服务的调用失败
- 记录错误日志
- 然后回调错误处理方法
# 使用资源和 try-catch 代码块
最后一个要与命令式编程对应的对比就是使用 Java 7 "try-with-resources" 或 finally
代码块清理资源。这是第 (6) 条(使用 finally
代码块清理资源或使用 Java 7 引入的 "try-with-resource")。在 Reactor 中都有对应的方法: using
和 doFinally
:
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true); // 4
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Flux<String> flux =
Flux.using(
() -> disposableInstance, // 1
disposable -> Flux.just(disposable.toString()), // 2
Disposable::dispose // 3
);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 第一个 lambda 生成资源,这里我们返回模拟的(mock)
Disposable
- 第二个 lambda 处理资源,返回一个
Flux<T>
- 第三个 lambda 在 2) 中的资源
Flux
终止或取消的时候,用于清理资源 - 在订阅或执行流序列之后,
isDisposed
会置为true
另一方面, doFinally
在序列终止(无论是 onComplete
、onError
还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消?)。
LongAdder statsCancel = new LongAdder(); // 1
Flux<String> flux =
Flux.just("foo", "bar")
.doFinally(type -> {
if (type == SignalType.CANCEL) // 2
statsCancel.increment(); // 3
})
.take(1); // 4
2
3
4
5
6
7
8
9
- 我们想进行统计,所以用到了
LongAdder
doFinally
用SignalType
检查了终止信号的类型- 如果只是取消,那么统计数据自增
take(1)
能够在发出 1 个元素后取消流
# 演示终止方法 onError
为了演示当错误出现的时候如何导致上游序列终止,我们使用 Flux.interval
构造一个更加直观的例子。 这个 interval 操作符会在每 x 单位的时间发出一个自增的 Long
值。
Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh");
flux.subscribe(System.out::println);
Thread.sleep(2100); // 1
2
3
4
5
6
7
8
9
10
- 注意
interval
默认基于一个 timerScheduler
来执行。 如果我们想在 main 方法中运行, 我们需要调用sleep
,这样程序就可以在还没有产生任何值的时候就退出了
每 250ms 打印出一行信息,如下:
tick 0
tick 1
tick 2
Uh oh
2
3
4
即使多给了 1 秒钟时间,也没有更多的 tick 信号由 interval
产生了,所以序列确实被错误信号终止了。
# 重试
还有一个用于错误处理的操作符你可能会用到,就是 retry
,见文知意,用它可以对出现错误的序列进行重试。
问题是它对于上游 Flux
是基于重订阅(re-subscribing)的方式。这实际上已经一个不同的序列了, 发出错误信号的序列仍然是终止了的。为了验证这一点,我们可以在继续用上边的例子,增加一个 retry(1)
代替 onErrorReturn
来重试一次。
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.elapsed() // 1
.retry(1)
.subscribe(System.out::println, System.err::println); // 2
Thread.sleep(2100); // 3
2
3
4
5
6
7
8
9
10
elapsed
会关联从当前值与上个值发出的时间间隔(如下边输出的内容中的 259/249/251)- 我们还是要看一下
onError
时的内容 - 确保我们有足够的时间可以进行 4x2 次 tick
输出如下:
259,tick 0
249,tick 1
251,tick 2
506,tick 0 // 1
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
2
3
4
5
6
7
- 一个新的
interval
从 tick 0 开始。多出来的 250ms 间隔来自于第 4 次 tick, 就是导致出现异常并执行 retry 的那次
可见, retry(1)
不过是再一次从新订阅了原始的 interval
,从 tick 0 开始。第二次, 由于异常再次出现,便将异常传递到下游了。
还有一个「高配版」的 retry
(retryWhen
),它使用一个伴随("companion") Flux
来判断对某次错误是否要重试。这个伴随 Flux
是由操作符创建的,但是由开发者包装它, 从而实现对重试操作的配置。
这个伴随 Flux
是一个 Flux<Throwable>
,它作为 retryWhen
的唯一参数被传递给一个 Function
,你可以定义这个 Function
并让它返回一个新的 Publisher<?>
。重试的循环 会这样运行:
- 每次出现错误,错误信号会发送给伴随
Flux
,后者已经被你用Function
包装。 - 如果伴随
Flux
发出元素,就会触发重试。 - 如果伴随
Flux
完成(complete),重试循环也会停止,并且原始序列也会 完成(complete)。 - 如果伴随
Flux
产生一个错误,重试循环停止,原始序列也停止 或 完成,并且这个错误会导致 原始序列失败并终止。
了解前两个场景的区别是很重要的。如果让伴随 Flux
完成(complete)等于吞掉了错误。如下代码用 retryWhen
模仿了 retry(3)
的效果:
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException()) // 1
.doOnError(System.out::println) // 2
.retryWhen(companion -> companion.take(3)); // 3
2
3
4
- 持续产生错误
- 在 retry 之前 的
doOnError
可以让我们看到错误 - 这里,我们认为前 3 个错误是可以重试的(
take(3)
),再有错误就放弃
事实上,上边例子最终得到的是一个 空的 Flux
,但是却 成功 完成了。反观对同一个 Flux
调用 retry(3)
的话,最终是以最后一个 error 终止 Flux
,故而 retryWhen
与之不同。
实现同样的效果需要一些额外的技巧:
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), // 1
(error, index) -> { // 2
if (index < 4) return index; // 3
else throw Exceptions.propagate(error); // 4
})
);
2
3
4
5
6
7
8
9
- 技巧一:使用
zip
和一个「重试个数 + 1」的range
zip
方法让你可以在对重试次数计数的同时,仍掌握着原始的错误(error)- 允许三次重试,小于 4 的时候发出一个值
- 为了使序列以错误结束。我们将原始异常在三次重试之后抛出
类似的代码也可以被用于实现 exponential backoff and retry 模式 (重试指定的次数, 且每一次重试之间停顿的时间逐渐增加),参考 最佳实践。
# 在操作符或函数式中处理异常
总体来说,所有的操作符自身都可能包含触发异常的代码,或自定义的可能导致失败的代码, 所以它们都自带一些错误处理方式。
一般来说,一个 不受检异常(Unchecked Exception) 总是由 onError
传递。例如, 在一个 map
方法中抛出 RuntimeException
会被翻译为一个 onError
事件,如下:
Flux.just("foo")
.map(s -> { throw new IllegalArgumentException(s); })
.subscribe(v -> System.out.println("GOT VALUE"),
e -> System.out.println("ERROR: " + e));
2
3
4
上边代码输出如下:
ERROR: java.lang.IllegalArgumentException: foo
Exception
可以在其被传递给onError
之前,使用 内部错误 Hook 进行调整。
Reactor,定义了一系列的能够导致「严重失败」的错误(比如 OutOfMemoryError
),也可参考 Exceptions.throwIfFatal
方法。这些错误意味着 Reactor 无力处理只能抛出,无法传递下去。
还有些情况下不受检异常仍然无法传递下去(多数处于subscribe 和 request 阶段), 因为可能由于多线程竞争导致两次
onError
或onComplete
的情况。当这种竞争发生的时候, 无法传递下去的错误信号就被「丢弃」了。这些情况仍然可以通过自定义的 hook 来搞定,见 丢弃事件的 Hooks。
你可能会问:「那么 受检查异常(Checked Exceptions)?」
如果你需要调用一个声明为 throws
异常的方法,你仍然需要使用 try-catch
代码块处理异常。 有几种方式:
- 捕获异常,并修复它,流序列正常继续。
- 捕获异常,并把它包装(wrap)到一个 不受检异常 中,然后抛出(中断序列)。工具类
Exceptions
可用于这种方式(我们马上会讲到)。 - 如果你气我返回一个
Flux
(例如在flatMap
中),将异常包装在一个产生错误的Flux
中:return Flux.error(checkedException)
(流序列也会终止)。
Reactor 有一个工具类 Exceptions
,可以确保在收到受检异常的时候将其包装(wrap)起来。
- 如果需要,可以使用
Exceptions.propagate
方法来包装异常,它同样会首先调用throwIfFatal
, 并且不会包装RuntimeException
。 - 使用
Exceptions.unwrap
方法来得到原始的未包装的异常(追溯最初的异常)。
下面是一个 map
的例子,它使用的 convert 方法会抛出 IOException
:
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
2
3
4
5
6
现在想象你将这个方法用于一个 map
中,你必须明确捕获这个异常,并且你的 map
方法不能再次抛出它。 所以你可以将其以 RuntimeException
的形式传递给 onError
:
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
2
3
4
5
6
当后边订阅上边的这个 Flux
并响应错误(比如在用户界面)的时候,如果你想处理 IOException, 你还可以再将其转换为原始的异常。如下:
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
2
3
4
5
6
7
8
9
10
# Processors
Processors 既是一种特别的发布者(Publisher
)又是一种订阅者(Subscriber
)。 那意味着你可以 订阅一个 Processor
(通常它们会实现 Flux
),也可以调用相关方法来手动 插入数据到序列,或终止序列。
Processor 有多种类型,它们都有特别的语义规则,但是在你研究它们之前,最好问一下 自己如下几个问题:
# 我是否需要使用 Processor
多数情况下,你应该进行避免使用 Processor
,它们较难正确使用,主要用于一些特殊场景下。
如果你觉得 Processor
适合你的使用场景,请首先看一下是否尝试过以下两种替代方式:
- 是否有一个或多个操作符的组合能够满足需求?(见 操作符总结)
- 可编程式地创建一个序列 操作符是否能解决问题?(通常这些操作符 可以用来桥接非响应式的 API,它们提供了一个「sink」,在生成数据流序列方面, 概念上类似于
Processor
)
如果看了以上替代方案,你仍然觉得需要一个 Processor
,阅读 现有的 Processors 总览 这一节来了解一下不同的实现吧。
# 使用 Sink
门面对象来线程安全地生成流
比起直接使用 Reactor 的 Processors
,更好的方式是通过调用一次 sink()
来得到 Processor
的 Sink
。
FluxProcessor
的 sink 是线程安全的「生产者(producer)」,因此能够在应用程序中 多线程并发地生成数据。例如,一个线程安全的序列化(serialized)的 sink 能够通过 UnicastProcessor
创建:
UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);
2
多个生产者线程可以并发地生成数据到以下的序列化 sink。
sink.next(n);
根据 Processor
及其配置,next
产生的溢出有两种可能的处理方式:
- 一个无限的 processor 通过丢弃或缓存自行处理溢出。
- 一个有限的 processor 阻塞在
IGNORE
策略,或将overflowStrategy
应用于sink
。
# 现有的 Processors 总览
Reactor Core 内置多种 Processor
。这些 processor 具有不同的语法,大概分为三类。 下边简要介绍一下这三种 processor:
- 直接的(direct) (
DirectProcessor
和UnicastProcessor
):这些 processors 只能通过直接 调用Sink
的方法来推送数据 - 同步的(synchronous) (
EmitterProcessor
和ReplayProcessor
):这些 processors 既可以 直接调用Sink
方法来推送数据,也可以通过订阅到一个上游的发布者来同步地产生数据 - 异步的(asynchronous)(
WorkQueueProcessor
和TopicProcessor
):这些 processors 可以将从多个上游发布者得到的数据推送下去。由于使用了RingBuffer
的数据结构来 缓存多个来自上游的数据,因此更加有健壮性
异步的 processor 在实例化的时候最复杂,因为有许多不同的选项。因此它们暴露出一个 Builder
接口。 而简单的 processors 有静态的工厂方法。
# DirectProcessor
DirectProcessor
可以将信号分发给零到多个订阅者(Subscriber
)。它是最容易实例化的,使用静态方法 create()
即可。另一方面,它的不足是无法处理背压。所以,当 DirectProcessor
推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个 IllegalStateException
。
一旦 Processor
终止(通常通过调用它的 Sink
的 error(Throwable)
或 complete()
方法), 虽然它允许更多的订阅者订阅它,但是会立即向它们重新发送终止信号。
# UnicastProcessor
UnicastProcessor
可以使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者。
UnicastProcessor
有多种选项,因此提供多种不同的 create
静态方法。例如,它默认是 无限的(unbounded) :如果你在在订阅者还没有请求数据的情况下让它推送数据,它会缓存所有数据。
可以通过提供一个自定义的 Queue
的具体实现传递给 create
工厂方法来改变默认行为。如果给出的队列是 有限的(bounded), 并且缓存已满,而且未收到下游的请求,processor 会拒绝推送数据。
在上边 有限的 例子中,还可以在构造 processor 的时候提供一个回调方法,这个回调方法可以在每一个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。
# EmitterProcessor
EmitterProcessor
能够向多个订阅者发送数据,并且可以对每一个订阅者进行背压处理。它本身也可以订阅一个 Publisher
并同步获得数据。
最初如果没有订阅者,它仍然允许推送一些数据到缓存,缓存大小由 bufferSize
定义。 之后如果仍然没有订阅者订阅它并消费数据,对 onNext
的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。
因此第一个订阅者会收到最多 bufferSize
个元素。然而之后, processor 不会重新发送(replay) 数据给后续的订阅者。这些后续接入的订阅者只能获取到它们开始订阅 之后 推送的数据。这个内部的 缓存会继续用于背压的目的。
默认情况下,如果所有的订阅者都取消了(基本意味着它们都不再订阅(un-subscribed)了), 它会清空内部缓存,并且不再接受更多的订阅者。这一点可以通过 create
静态工厂方法的 autoCancel
参数来配置。
# ReplayProcessor
ReplayProcessor
会缓存直接通过自身的 Sink
推送的元素,以及来自上游发布者的元素, 并且后来的订阅者也会收到重发(replay)的这些元素。
可以通过多种配置方式创建它:
- 缓存一个元素(
cacheLast
)。 - 缓存一定个数的历史元素(
create(int)
),所有的历史元素(create()
)。 - 缓存基于时间窗期间内的元素(
createTimeout(Duration)
)。 - 缓存基于历史个数和时间窗的元素(
createSizeOrTimeout(int, Duration)
)。
# TopicProcessor
TopicProcessor
是一个异步的 processor,它能够重发来自多个上游发布者的元素, 这需要在创建它的时候配置 shared
(见 build()
的 share(boolean)
配置)。
注意,如果你企图在并发环境下通过并发的上游 Publisher 调用 TopicProcessor
的 onNext
、 onComplete
,或 onError
方法,就必须配置 shared。
否则,并发调用就是非法的,从而 processor 是完全兼容响应式流规范的。
TopicProcessor
能够对多个订阅者发送数据。它通过对每一个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出 onError
或 onComplete
信号,或关联的订阅者被取消。 最多可以接受的订阅者个数由构造者方法 executor
指定,通过提供一个有限线程数的 ExecutorService
来限制这一个数。
这个 processor 基于一个 RingBuffer
数据结构来存储已发送的数据。每一个订阅者线程 自行管理其相关的数据在 RingBuffer
中的索引。
这个 processor 也有一个 autoCancel
构造器方法:如果设置为 true
(默认的),那么当 所有的订阅者取消之后,源 Publisher
(s) 也就被取消了。
# WorkQueueProcessor
WorkQueueProcessor
也是一个异步的 processor,也能够重发来自多个上游发布者的元素, 同样在创建时需要配置 shared
(它多数构造器配置与 TopicProcessor
相同)。
它放松了对响应式流规范的兼容,但是好处就在于相对于 TopicProcessor
来说需要更少的资源。 它仍然基于 RingBuffer
,但是不再要求每一个订阅者都关联一个线程,因此相对于 TopicProcessor
来说更具扩展性。
代价在于分发模式有些区别:来自订阅者的请求会汇总在一起,并且这个 processor 每次只对一个 订阅者发送数据,因此需要循环(round-robin)对订阅者发送数据,而不是一次全部发出的模式。
无法保证完全公平的循环分发。
WorkQueueProcessor
多数构造器方法与 TopicProcessor
相同,比如 autoCancel
、share
, 以及 waitStrategy
。下游订阅者的最大数目同样由构造器 executor
配置的 ExecutorService
决定。
你最好注意不要有太多订阅者订阅
WorkQueueProcessor
,因为这 会锁住 processor。 如果你需要限制订阅者数量,最好使用一个ThreadPoolExecutor
或ForkJoinPool
。这个 processor 能够检测到(线程池)容量并在订阅者过多时抛出异常。