Reactor - 最佳实践
# 如何包装一个同步阻塞的调用
很多时候,信息源是同步和阻塞的。在 Reactor 中,我们用以下方式处理这种信息源:
Mono blockingWrapper = Mono.fromCallable(() -> { // 1
return /* make a remote synchronous call */ // 2
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); // 3
2
3
4
- 使用
fromCallable
方法生成一个 Mono - 返回同步、阻塞的资源
- 使用
Schedulers.elastic()
确保对每一个订阅来说运行在一个专门的线程上
因为调用返回一个值,所以你应该使用 Mono。你应该使用 Schedulers.elastic
因为它会创建一个专门的线程来等待阻塞的调用返回。
注意 subscribeOn
方法并不会「订阅」这个 Mono
。它只是指定了订阅操作使用哪个 Scheduler
。
# 用在 Flux
上的操作符好像没起作用,为啥
请确认你确实对调用 .subscribe()
的发布者应用了这个操作符。
Reactor 的操作符是装饰器(decorators)。它们会返回一个不同的(发布者)实例, 这个实例对上游序列进行了包装并增加了一些的处理行为。所以,最推荐的方式是将操作符「串」起来。
对比下边的两个例子:
没有串起来(不正确的)
Flux<String> flux = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); // 1
flux.subscribe(next -> System.out.println("Received: " + next));
2
3
- 问题在这,
flux
变量并没有改变
串起来(正确的)
Flux<String> flux = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));
2
3
下边的例子更好(因为更简洁):
串起来(最好的)
Flux<String> secrets = Flux
.just("foo", "chain")
.map(secret -> secret.replaceAll(".", "*"))
.subscribe(next -> System.out.println("Received: " + next));
2
3
4
第一个例子的输出:
Received: foo
Received: chain
2
后两个例子的输出:
Received: ***
Received: *****
2
# Mono
zipWith
/zipWhen
没有被调用
例子
myMethod.process("a") // 这个方法返回 Mono<Void>
.zipWith(myMethod.process("b"), combinator) //没有被调用
.subscribe();
2
3
如果源 Mono
为空或是一个 Mono<Void>
(Mono<Void>
通常用于「空」的场景), 下边的组合操作就不会被调用。
对于类似 zipWith
的用于转换的操作符来说,这是比较典型的场景。 这些操作符依赖于数据元素来转换为输出的元素。 如果任何一个序列是空的,则返回的就是一个空序列,所以请谨慎使用。 例如在 then()
之后使用 zipWith()
就会导致这一问题。
对于以 Function
作为参数的 and
更是如此,因为返回的 Mono 是依赖于收到的数据懒加载的(而对于空序列或 Void
的序列来说是没有数据发出来的)。
你可以使用 .defaultIfEmpty(T)
将空序列替换为包含 T
类型缺省值的序列(而不是 Void
序列), 从而可以避免类似的情况出现。举例如下:
在 zipWhen
前使用 defaultIfEmpty
myMethod.emptySequenceForKey("a") // 这个方法返回一个空的 Mono<String>
.defaultIfEmpty("") // 将空序列转换为包含字符串 "" 的序列
.zipWhen(aString -> myMethod.process("b")) // 当 "" 发出时被调用
.subscribe();
2
3
4
# 如何用 retryWhen
来实现 retry(3)
的效果
retryWhen
方法比较复杂,希望下边的一段模拟 retry(3)
的代码能够帮你更好地理解它的工作方式:
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), // 1
(error, index) -> { // 2
if (index < 4) return index; // 3
else throw Exceptions.propagate(error); // 4
})
);
2
3
4
5
6
7
8
9
- 技巧一:使用
zip
和一个「重试个数 + 1」的range
。 zip
方法让你可以在对重试次数计数的同时,仍掌握着原始的错误(error)。- 允许三次重试,小于 4 的时候发出一个值。
- 为了使序列以错误结束。我们将原始异常在三次重试之后抛出。
# 如何使用 retryWhen
进行 exponential backoff
Exponential backoff 的意思是进行的多次重试之间的间隔越来越长, 从而避免对源系统造成过载,甚至宕机。基本原理是,如果源产生了一个错误, 那么已经是处于不稳定状态,可能不会立刻复原。所以,如果立刻就重试可能会产生另一个错误, 导致源更加不稳定。
下面是一段实现 exponential backoff 效果的例子,每次重试的间隔都会递增 (伪代码: delay = attempt number * 100 milliseconds):
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) // 1
.zipWith(Flux.range(1, 4), (error, index) -> { // 2
if (index < 4) return index;
else throw Exceptions.propagate(error);
})
.flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) // 3
.doOnNext(s -> System.out.println("retried at " + LocalTime.now())) // 4
);
2
3
4
5
6
7
8
9
10
11
- 记录错误出现的时间
- 使用
retryWhen
+zipWith
的技巧实现重试3次的效果 - 通过
flatMap
来实现延迟时间递增的效果 - 同样记录重试的时间
订阅它,输出如下:
java.lang.IllegalArgumentException at 18:02:29.338
retried at 18:02:29.459 // 1
java.lang.IllegalArgumentException at 18:02:29.460
retried at 18:02:29.663 // 2
java.lang.IllegalArgumentException at 18:02:29.663
retried at 18:02:29.964 // 3
java.lang.IllegalArgumentException at 18:02:29.964
2
3
4
5
6
7
- 第一次重试延迟大约 100ms
- 第二次重试延迟大约 200ms
- 第三次重试延迟大约 300ms
# 如何使用 publishOn()
确保线程关联性
如 调度器(Schedulers) 所述,publishOn()
可以用来切换执行线程。 publishOn
能够影响到其之后的操作符的执行线程,直到有新的 publishOn
出现。 所以 publishOn
的位置很重要。
比如下边的例子, map()
中的 transform
方法是在 scheduler1
的一个工作线程上执行的, 而 doOnNext()
中的 processNext
方法是在 scheduler2
的一个工作线程上执行的。 单线程的调度器可能用于对不同阶段的任务或不同的订阅者确保线程关联性。
EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
.map(i -> transform(i))
.publishOn(scheduler2)
.doOnNext(i -> processNext(i))
.subscribe();
2
3
4
5
6