Young Kbt blog Young Kbt blog
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)

Shp Liu

朝圣的使徒,正在走向编程的至高殿堂!
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)
  • MyBatis

  • MyBatis-Plus

  • 中间件 - ActiveMQ

  • 中间件 - RabbitMQ

  • 中间件 - RocketMQ

  • 中间件 - Kafka

  • 高性能服务器 - Nginx

  • 响应式框架 - Reactor

    • Reactor - 快速上手
    • Reactor - 响应式编程
    • Reactor - 核心特征
      • Reactor 核心特性
      • Flux 包含 0-N 个元素的异步序列
      • Mono 异步的 0-1 结果
      • 简单的创建和订阅 Flux 或 Mono 的方法
        • subscribe 方法示例
      • 可编程式地创建一个序列
        • Generate
        • Create
        • 推送(push)模式
        • 推送/拉取(push/pull)混合模式
        • 清理(Cleaning up)
        • Handle
      • 调度器(Schedulers)
      • 线程模型
      • 处理错误
        • 错误处理方法
        • 动态候补值
        • 捕获并重新抛出
        • 记录错误日志
        • 使用资源和 try-catch 代码块
        • 演示终止方法 onError
        • 重试
        • 在操作符或函数式中处理异常
      • Processors
        • 我是否需要使用 Processor
        • 使用 Sink 门面对象来线程安全地生成流
        • 现有的 Processors 总览
        • DirectProcessor
        • UnicastProcessor
        • EmitterProcessor
        • ReplayProcessor
        • TopicProcessor
        • WorkQueueProcessor
    • Reactor - 对 Kotlin 的支持
    • Reactor - 单元测试
    • Reactor - 调试 Reactor
    • Reactor - 高级特性与概念
    • Reactor - 操作符总结
    • Reactor - 最佳实践
    • Reactor - 扩展性
  • 框架
  • 响应式框架 - Reactor
Young Kbt
2024-11-02
目录

Reactor - 核心特征

  • Reactor 核心特性
  • Flux 包含 0-N 个元素的异步序列
  • Mono 异步的 0-1 结果
  • 简单的创建和订阅 Flux 或 Mono 的方法
    • subscribe 方法示例
  • 可编程式地创建一个序列
    • Generate
    • Create
    • Handle
  • 调度器(Schedulers)
  • 线程模型
  • 处理错误
    • 错误处理方法
    • 在操作符或函数式中处理异常
  • Processors
    • 我是否需要使用 Processor
    • 使用 Sink 门面对象来线程安全地生成流
    • 现有的 Processors 总览

# Reactor 核心特性

Reactor 项目的主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

Reactor 引入了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操作方式。 一个 Flux 对象代表一个包含 0..N 个元素的响应式序列,而一个 Mono 对象代表一个包含 零/一个(0..1)元素的结果。

这种区别为这俩类型带来了语义上的信息——表明了异步处理逻辑所面对的元素基数。比如, 一个 HTTP 请求产生一个响应,所以对其进行 count 操作是没有多大意义的。表示这样一个 结果的话,应该用 Mono<HttpResponse> 而不是 Flux<HttpResponse>,因为要置于其上的 操作通常只用于处理 0/1 个元素。

有些操作可以改变基数,从而需要切换类型。比如,count 操作用于 Flux,但是操作 返回的结果是 Mono<Long>。

# Flux 包含 0-N 个元素的异步序列

image-20241102010252633

Flux<T> 是一个能够发出 0 到 N 个元素的标准的 Publisher<T>,它会被一个「错误(error)」 或「完成(completion)」信号终止。因此,一个 flux 的可能结果是一个 value、completion 或 error。 就像在响应式流规范中规定的那样,这三种类型的信号被翻译为面向下游的 onNext,onComplete和onError方法。

由于多种不同的信号可能性,Flux 可以作为一种通用的响应式类型。注意,所有的信号事件, 包括代表终止的信号事件都是可选的:如果没有 onNext 事件但是有一个 onComplete 事件, 那么发出的就是 空的 有限序列,但是去掉 onComplete 那么得到的就是一个 无限的 空序列。 当然,无限序列也可以不是空序列,比如,Flux.interval(Duration) 生成的是一个 Flux<Long>, 这就是一个无限地周期性发出规律 tick 的时钟序列。

# Mono 异步的 0-1 结果

image-20241102010320385

Mono<T> 是一种特殊的 Publisher<T>, 它最多发出一个元素,然后终止于一个 onComplete 信号或一个 onError 信号。

它只适用其中一部分可用于 Flux 的操作。比如,(两个 Mono 的)结合类操作可以忽略其中之一 而发出另一个 Mono,也可以将两个都发出,对于后一种情况会切换为一个 Flux。

例如,Mono#concatWith(Publisher) 返回一个 Flux,而 Mono#then(Mono) 返回另一个 Mono。

注意,Mono 可以用于表示「空」的只有完成概念的异步处理(比如 Runnable)。这种用 Mono<Void> 来创建。

# 简单的创建和订阅 Flux 或 Mono 的方法

最简单的上手 Flux 和 Mono 的方式就是使用相应类提供的多种工厂方法之一。

比如,如果要创建一个 String 的序列,你可以直接列举它们,或者将它们放到一个集合里然后用来创建 Flux,如下:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
1
2
3
4

工厂方法的其他例子如下:

Mono<String> noData = Mono.empty(); // 1

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); // 2
1
2
3
4
5
  1. 注意,即使没有值,工厂方法仍然采用通用的返回类型
  2. 第一个参数是 range 的开始,第二个参数是要生成的元素个数

在订阅(subscribe)的时候,Flux 和 Mono 使用 Java 8 lambda 表达式。 .subscribe() 方法有多种不同的方法签名,你可以传入各种不同的 lambda 形式的参数来定义回调。如下所示:

基于 lambda 的对 Flux 的订阅(subscribe)

subscribe(); // 1

subscribe(Consumer<? super T> consumer); // 2

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); // 3

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); // 4

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); // 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  1. 订阅并触发序列
  2. 对每一个生成的元素进行消费
  3. 对正常元素进行消费,也对错误进行响应
  4. 对正常元素和错误均有响应,还定义了序列正常完成后的回调
  5. 对正常元素、错误和完成信号均有响应,同时也定义了对该 subscribe 方法返回的 Subscription 执行的回调。

以上方法会返回一个 Subscription 的引用,如果不再需要更多元素你可以通过它来取消订阅。 取消订阅时,源头会停止生成新的数据,并清理相关资源。取消和清理的操作在 Reactor 中是在 接口 Disposable 中定义的。

# subscribe 方法示例

下面对 subscribe 的 5 个不同签名的方法的示例,如下是一个无参的基本方法的使用:

Flux<Integer> ints = Flux.range(1, 3); // 1
ints.subscribe(); // 2
1
2
  1. 配置一个在订阅时会产生3个值的 Flux
  2. 最简单的订阅方式

第二行代码没有任何输出,但是它确实执行了。Flux 产生了 3 个值。如果我们传入一个 lambda,我们就可以看到这几个值,如下一个列子:

Flux<Integer> ints = Flux.range(1, 3); // 1
ints.subscribe(i -> System.out.println(i)); // 2
1
2
  1. 配置一个在订阅时会产生3个值的 Flux
  2. 订阅它并打印值

第二行代码会输入如下内容:

1
2
3
1
2
3

为了演示下一个方法签名,我们故意引入一个错误,如下所示:

Flux<Integer> ints = Flux.range(1, 4) // 1
      .map(i -> { // 2
        if (i <= 3) return i; // 3
        throw new RuntimeException("Got to 4"); // 4
      });
ints.subscribe(i -> System.out.println(i), // 5
      error -> System.err.println("Error: " + error));
1
2
3
4
5
6
7
  1. 配置一个在订阅时会产生4个值的 Flux
  2. 为了对元素进行处理,我们需要一个 map 操作
  3. 对于多数元素,返回值本身
  4. 对其中一个元素抛出错误
  5. 订阅的时候定义如何进行错误处理

现在我们有两个 lambda 表达式:一个是用来处理正常数据,一个用来处理错误。 刚才的代码输出如下:

1
2
3
Error: java.lang.RuntimeException: Got to 4
1
2
3
4

下一个 subscribe 方法的签名既有错误处理,还有一个完成后的处理,如下:

Flux<Integer> ints = Flux.range(1, 4); // 1
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");}); // 2
1
2
3
4
  1. 配置一个在订阅时会产生4个值的 Flux
  2. 订阅时定义错误和完成信号的处理

错误和完成信号都是终止信号,并且二者只会出现其中之一。为了能够最终全部正常完成,你必须处理错误信号。

用于处理完成信号的 lambda 是一对空的括号,因为它实际上匹配的是 Runnalbe 接口中的 run 方法, 不接受参数。刚才的代码输出如下:

1
2
3
4
Done
1
2
3
4
5

最后一个 subscribe 方法签名包含一个自定义的 subscriber(下一节会介绍到):

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");},
    s -> ss.request(10));
ints.subscribe(ss);
1
2
3
4
5
6
7

上面这个例子中,我们把一个自定义的 Subscriber 作为 subscribe 方法的最后一个参数。 下边的例子是这个自定义的 Subscriber,这是一个对 Subscriber 的最简单实现:

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

        public void hookOnSubscribe(Subscription subscription) {
                System.out.println("Subscribed");
                request(1);
        }

        public void hookOnNext(T value) {
                System.out.println(value);
                request(1);
        }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

SampleSubscriber 类继承自 BaseSubscriber,在 Reactor 中, 推荐用户扩展它来实现自定义的 Subscriber。这个类提供了一些 hook 方法,我们可以通过重写它们来调整 subscriber 的行为。 默认情况下,它会触发一个无限个数的请求,但是当你想自定义请求元素的个数的时候,扩展 BaseSubscriber 就很方便了。

扩展的时候通常至少要覆盖 hookOnSubscribe(Subscription subscription) 和 hookOnNext(T value) 这两个方法。这个例子中, hookOnSubscribe 方法打印一段话到标准输出,然后进行第一次请求。 然后 hookOnNext 同样进行了打印,同时逐个处理剩余请求。

SampleSubscriber 输出如下:

Subscribed
1
2
3
4
1
2
3
4
5

建议你同时重写 hookOnError、hookOnCancel,以及 hookOnComplete 方法。 你最好也重写 hookFinally 方法。SampleSubscribe 确实是一个最简单的实现了 请求有限个数元素的 Subscriber。

后边还会再讨论 BaseSubscriber。

响应式流规范定义了另一个 subscribe 方法的签名,它只接收一个自定义的 Subscriber, 没有其他的参数,如下所示:

subscribe(Subscriber<? super T> subscriber);
1

如果你已经有一个 Subscriber,那么这个方法签名还是挺有用的。况且,你可能还会用到它 来做一些订阅相关(subscription-related)的回调。比如,你想要自定义「背压(backpressure)」 并且自己来触发请求。

在这种情况下,使用 BaseSubscriber 抽象类就很方便,因为它提供了很好的配置「背压」 的方法。

使用 BaseSubscriber 来配置「背压」

Flux<String> source = someStringSource();

source.map(String::toUpperCase)
      .subscribe(new BaseSubscriber<String>() { // 1
          @Override
          protected void hookOnSubscribe(Subscription subscription) {
              // 2
              request(1); // 3
          }

          @Override
          protected void hookOnNext(String value) {
              request(1); // 4
          }
          // 5
      });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  1. BaseSubscriber 是一个抽象类,所以我们创建一个匿名内部类
  2. BaseSubscriber 定义了多种用于处理不同信号的 hook。它还定义了一些捕获 Subscription 对象的现成方法,这些方法可以用在 hook 中
  3. request(n) 就是这样一个方法。它能够在任何 hook 中,通过 subscription 向上游传递 背压请求。这里我们在开始这个流的时候请求1个元素值
  4. 随着接收到新的值,我们继续以每次请求一个元素的节奏从源头请求值
  5. 其他 hooks 有 hookOnComplete, hookOnError, hookOnCancel, and hookFinally (它会在流终止的时候被调用,传入一个 SignalType 作为参数)

当你修改请求操作的时候,你必须注意让 subscriber 向上提出足够的需求, 否则上游的 Flux 可能会被「卡住」。所以 BaseSubscriber 在进行扩展的时候要覆盖 hookOnSubscribe 和 onNext,这样你至少会调用 request 一次。

BaseSubscriber 还提供了 requestUnbounded() 方法来切换到「无限」模式(等同于 request(Long.MAX_VALUE))。

# 可编程式地创建一个序列

在这一小节,我们介绍如何通过定义相对应的事件(onNext、onError和onComplete) 创建一个 Flux 或 Mono。所有这些方法都通过 API 来触发我们叫做 sink(池) 的事件。 sink 的类型不多,我们快速过一下。

# Generate

最简单的创建 Flux 的方式就是使用 generate 方法。

这是一种 同步地, 逐个地 产生值的方法,意味着 sink 是一个 SynchronousSink 而且其 next() 方法在每次回调的时候最多只能被调用一次。你也可以调用 error(Throwable) 或者 complete(),不过是可选的。

最有用的一种方式就是同时能够记录一个状态值(state),从而在使用 sink 发出下一个元素的时候能够 基于这个状态值去产生元素。此时生成器(generator)方法就是一个 BiFunction<S, SynchronousSink<T>, S>, 其中 <S> 是状态对象的类型。你需要提供一个 Supplier<S> 来初始化状态值,而生成器需要 在每一「回合」生成元素后返回新的状态值(供下一回合使用)。

例如我们使用一个 int 作为状态值。

基于状态值的 generate 示例

Flux<String> flux = Flux.generate(
    () -> 0, // 1
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); // 2
      if (state == 10) sink.complete(); // 3
      return state + 1; // 4
    });
1
2
3
4
5
6
7
  1. 初始化状态值(state)为 0
  2. 我们基于状态值 state 来生成下一个值(state 乘以 3)
  3. 我们也可以用状态值来决定什么时候终止序列
  4. 返回一个新的状态值 state,用于下一次调用

上面的代码生成了「3 x」的乘法表:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
1
2
3
4
5
6
7
8
9
10
11

我们也可以使用可变(mutable)类型(如上例,原生类型及其包装类,以及 String 等属于不可变类型) 的 <S>。上边的例子也可以用 AtomicLong 作为状态值,在每次生成后改变它的值。

可变类型的状态变量

Flux<String> flux = Flux.generate(
    AtomicLong::new, // 1
    (state, sink) -> {
      long i = state.getAndIncrement(); // 2
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; // 3
    });
1
2
3
4
5
6
7
8
  1. 这次我们初始化一个可变类型的状态值
  2. 改变状态值
  3. 返回 同一个 实例作为新的状态值

如果状态对象需要清理资源,可以使用 generate(Supplier<S>, BiFunction, Consumer<S>) 这个签名方法来清理状态对象(Consumer 在序列终止才被调用)。

下面是一个在 generate 方法中增加 Consumer 的例子:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { // 1
      long i = state.getAndIncrement(); // 2
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; // 3
    }, (state) -> System.out.println("state: " + state)); // 4
}
1
2
3
4
5
6
7
8
9
  1. 同样,初始化一个可变对象作为状态变量
  2. 改变状态
  3. 返回 同一个 实例作为新的状态
  4. 我们会看到最后一个状态值(11)会被这个 Consumer lambda 输出

如果 state 使用了数据库连接或者其他需要最终进行清理的资源,这个 Consumer lambda 可以用来在最后关闭连接或完成相关的其他清理任务。

# Create

作为一个更高级的创建 Flux 的方式, create 方法的生成方式既可以是同步, 也可以是异步的,并且还可以每次发出多个元素。

该方法用到了 FluxSink,后者同样提供 next,error 和 complete 等方法。 与 generate 不同的是,create 不需要状态值,另一方面,它可以在回调中触发 多个事件(即使是在未来的某个时间)。

create 有个好处就是可以将现有的 API 转为响应式,比如监听器的异步方法。

假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。如下:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}
1
2
3
4

你可以使用 create 方法将其转化为响应式类型 Flux<T>:

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( // 4
      new MyEventListener<String>() { // 1

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); // 2
          }
        }

        public void processComplete() {
            sink.complete(); // 3
        }
    });
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  1. 桥接 MyEventListener
  2. 每一个 chunk 的数据转化为 Flux 中的一个元素
  3. processComplete 事件转换为 onComplete
  4. 所有这些都是在 myEventProcessor 执行时异步执行的

此外,既然 create 可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy 来定义背压行为。

  • IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException
  • ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素
  • LATEST:让下游只得到上游最新的元素
  • BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)

Mono 也有一个用于 create 的生成器(generator)—— MonoSink,它不能生成多个元素, 因此会抛弃第一个元素之后的所有元素。

# 推送(push)模式

create 的一个变体是 push,适合生成事件流。与 create类似,push 也可以是异步地, 并且能够使用以上各种溢出策略(overflow strategies)管理背压。每次只有一个生成线程可以调用 next,complete 或 error。

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { // 1

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); // 2
          }
        }

        public void processComplete() {
            sink.complete(); // 3
        }

        public void processError(Throwable e) {
            sink.error(e); // 4
        }
    });
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  1. 桥接 SingleThreadEventListener API
  2. 在监听器所在线程中,事件通过调用 next 被推送到 sink
  3. complete 事件也在同一个线程中
  4. error 事件也在同一个线程中

# 推送/拉取(push/pull)混合模式

不像 push,create 可以用于 push 或 pull 模式,因此适合桥接监听器的 的 API,因为事件消息会随时异步地到来。回调方法 onRequest 可以被注册到 FluxSink 以便跟踪请求。这个回调可以被用于从源头请求更多数据,或者通过在下游请求到来 的时候传递数据给 sink 以实现背压管理。这是一种推送/拉取混合的模式, 因为下游可以从上游拉取已经就绪的数据,上游也可以在数据就绪的时候将其推送到下游。

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); // 3
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.request(n); // 1
        for(String s : message) {
           sink.next(s); // 2
        }
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  1. 当有请求的时候取出一个 message
  2. 如果有就绪的 message,就发送到 sink
  3. 后续异步到达的 message 也会被发送给 sink

# 清理(Cleaning up)

onDispose 和 onCancel 这两个回调用于在被取消和终止后进行清理工作。 onDispose 可用于在 Flux 完成,有错误出现或被取消的时候执行清理。 onCancel 只用于针对「取消」信号执行相关操作,会先于 onDispose 执行。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) // 1
        .onDispose(() -> channel.close()) // 2
    });
1
2
3
4
5
  1. onCancel 在取消时被调用。
  2. onDispose 在有完成、错误和取消时被调用

# Handle

handle 方法有些不同,它在 Mono 和 Flux 中都有。然而,它是一个实例方法 (instance method),意思就是它要链接在一个现有的源后使用(与其他操作符一样)。

它与 generate 比较类似,因为它也使用 SynchronousSink,并且只允许元素逐个发出。 然而,handle 可被用于基于现有数据源中的元素生成任意值,有可能还会跳过一些元素。 这样,可以把它当做 map 与 filter 的组合。handle 方法签名如下:

handle(BiConsumer<T, SynchronousSink<R>>)
1

举个例子,响应式流规范允许 null 这样的值出现在序列中。假如你想执行一个类似 map 的操作,你想利用一个现有的具有映射功能的方法,但是它会返回 null,这时候怎么办呢?

例如,下边的方法可以用于 Integer 序列,映射为字母或 null 。

public String alphabet(int letterNumber) {
        if (letterNumber < 1 || letterNumber > 26) {
                return null;
        }
        int letterIndexAscii = 'A' + letterNumber - 1;
        return "" + (char) letterIndexAscii;
}
1
2
3
4
5
6
7

我们可以使用 handle 来去掉其中的 null。

将 handle 用于一个 "映射 + 过滤 null" 的场景

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); // 1
        if (letter != null) // 2
            sink.next(letter); // 3
    });

alphabet.subscribe(System.out::println);
1
2
3
4
5
6
7
8
  1. 映射到字母。
  2. 如果返回的是 null
  3. 就不会调用 sink.next 从而过滤掉

输出如下:

M
I
T
1
2
3

# 调度器(Schedulers)

Reactor, 就像 RxJava,也可以被认为是 并发无关(concurrency agnostic) 的。意思就是, 它并不强制要求任何并发模型。更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库。

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。 Scheduler (opens new window) 是一个拥有广泛实现类的抽象接口。 Schedulers (opens new window) 类提供的静态方法用于达成如下的执行环境:

  • 当前线程(Schedulers.immediate())
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()
  • 弹性线程池(Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。 Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源,见 如何包装一个同步阻塞的调用?
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同

此外,你还可以使用 Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler。(虽然不太建议,不过你也可以使用 Executor 来创建)。你也可以使用 newXXX 方法来创建不同的调度器。比如 Schedulers.newElastic(yourScheduleName) 创建一个新的名为 yourScheduleName 的弹性调度器。

操作符基于非阻塞算法实现,从而可以利用到某些调度器的工作窃取(work stealing) 特性的好处。

一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器)例如, 通过工厂方法 Flux.interval(Duration.ofMillis(300)) 生成的每 300ms 打点一次的 Flux<Long>, 默认情况下使用的是 Schedulers.parallel(),下边的代码演示了如何将其装换为 Schedulers.single():

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
1

Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。 它们都接受一个 Scheduler 作为参数,从而可以改变调度器。但是 publishOn 在链中出现的位置 是有讲究的,而 subscribeOn 则无所谓。要理解它们的不同,你首先要理解 subscribe() 之前什么都不会发生。

在 Reactor 中,当你在操作链上添加操作符的时候,你可以根据需要在 Flux 和 Mono 的实现中包装其他的 Flux 和 Mono。一旦你订阅(subscribe)了它,一个 Subscriber 的链 就被创建了,一直向上到第一个 publisher 。这些对开发者是不可见的,开发者所能看到的是最外一层的 Flux (或 Mono)和 Subscription,但是具体的任务是在中间这些跟操作符相关的 subscriber 上处理的。

基于此,我们仔细研究一下 publishOn 和 subscribeOn 这两个操作符:

  • publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)。
  • subscribeOn 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论你把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context)。 但是,它不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。

只有操作链中最早的 subscribeOn 调用才算数。

# 线程模型

Flux 和 Mono 不会创建线程。一些操作符,比如 publishOn,会创建线程。同时,作为一种任务共享形式, 这些操作符可能会从其他任务池(work pool)——如果其他任务池是空闲的话——那里「偷」线程。因此, 无论是 Flux、Mono 还是 Subscriber 都应该精于线程处理。它们依赖这些操作符来管理线程和任务池。

publishOn 强制下一个操作符(很可能包括下一个的下一个…)来运行在一个不同的线程上。 类似的,subscribeOn 强制上一个操作符(很可能包括上一个的上一个…)来运行在一个不同的线程上。 记住,在你订阅(subscribe)前,你只是定义了处理流程,而没有启动发布者。基于此,Reactor 可以使用这些规则来决定如何执行操作链。然后,一旦你订阅了,整个流程就开始工作了。

下边的例子演示了支持任务共享的多线程模型:

Flux.range(1, 10000) // 1
    .publishOn(Schedulers.parallel()) // 2
    .subscribe(result) // 3
1
2
3
  1. 创建一个有 10,000 个元素的 Flux
  2. 创建等同于 CPU 个数的线程(最小为4)
  3. subscribe() 之前什么都不会发生

Scheduler.parallel() 创建一个基于单线程 ExecutorService 的固定大小的任务线程池。 因为可能会有一个或两个线程导致问题,它总是至少创建 4 个线程。然后 publishOn 方法便共享了这些任务线程, 当 publishOn 请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样, 就是进行了任务共享(一种资源共享方式)。Reactor 还提供了好几种共享资源的方式,请参考 Schedulers (opens new window)。

Scheduler.elastic() 也能创建线程,它能够很方便地创建专门的线程(以便跑一些可能会阻塞资源的任务, 比如一个同步服务),请见 如何包装一个同步阻塞的调用?。

内部机制保证了这些操作符能够借助自增计数器(incremental counters)和警戒条件(guard conditions) 以线程安全的方式工作。例如,如果我们有四个线程处理一个流(就像上边的例子),每一个请求会让计数器自增, 这样后续的来自不同线程的请求就能拿到正确的元素。

# 处理错误

如果想了解有哪些可用于错误处理的操作符,请参考 错误处理。

在响应式流中,错误(error)是终止(terminal)事件。当有错误发生时,它会导致流序列停止, 并且错误信号会沿着操作链条向下传递,直至遇到你定义的 Subscriber 及其 onError 方法。

这样的错误还是应该在应用层面解决的。比如,你可能会将错误信息显示在用户界面,或者通过某个 REST 端点(endpoint)发出。因此,订阅者(subscriber)的 onError 方法是应该定义的。

如果没有定义,onError 会抛出 UnsupportedOperationException。你可以接下来再 检测错误,并通过 Exceptions.isErrorCallbackNotImplemented 方法捕获和处理它。

Reactor 还提供了其他的用于在链中处理错误的方法,即 错误处理操作(error-handling operators)。

在你了解错误处理操作符之前,你必须牢记 响应式流中的任何错误都是一个终止事件。 即使用了错误处理操作符,也不会让源头流序列继续。而是将 onError 信号转化为一个 新的 序列 的开始。换句话说,它代替了被终结的 上游 流序列。

现在我们来逐个看看错误处理的方法。需要的时候我们会同时用到命令式编程风格的 try 代码块来作比较。

# 错误处理方法

你也许熟悉在 try-catch 代码块中处理异常的几种方法。常见的包括如下几种:

  1. 捕获并返回一个静态的缺省值。
  2. 捕获并执行一个异常处理方法。
  3. 捕获并动态计算一个候补值来顶替。
  4. 捕获,并再包装为某一个 业务相关的异常,然后再抛出业务异常。
  5. 捕获,记录错误日志,然后继续抛出。
  6. 使用 finally 来清理资源,或使用 Java 7 引入的 "try-with-resource"。

以上所有这些在 Reactor 都有相应的基于 error-handling 操作符处理方式。

在开始研究这些操作符之前,我们先准备好响应式链(reactive chain)方式和 try-catch 代码块方式(以便对比)。

当订阅的时候,位于链结尾的 onError 回调方法和 catch 块类似,一旦有异常,执行过程会跳入到 catch:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) // 1
    .map(v -> doSecondTransform(v)); // 2
s.subscribe(value -> System.out.println("RECEIVED " + value), // 3
            error -> System.err.println("CAUGHT " + error) // 4
);
1
2
3
4
5
6
  1. 执行 map 转换,有可能抛出异常
  2. 如果没问题,执行第二个 map 转换操作
  3. 所有转换成功的值都打印出来
  4. 一旦有错误,序列(sequence)终止,并打印错误信息

这与 try/catch 代码块是类似的:

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); // 1
        String v2 = doSecondTransform(v1); // 2
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); // 3
}
1
2
3
4
5
6
7
8
9
  1. 如果这里抛出异常
  2. 后续的代码跳过
  3. 执行过程直接到这

既然我们准备了两种方式做对比,我们就来看一下不同的错误处理场景,以及相应的操作符。

# 静态缺省值

与第 (1) 条(捕获并返回一个静态的缺省值)对应的是 onErrorReturn:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");
1
2
3

你还可以通过判断错误信息的内容,来筛选哪些要给出缺省值,哪些仍然让错误继续传递下去:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
1
2
3
# 异常处理方法

如果你不只是想要在发生错误的时候给出缺省值,而是希望提供一种更安全的处理数据的方式, 可以使用 onErrorResume。这与第 (2) 条(捕获并执行一个异常处理方法)类似。

假设,你会尝试从一个外部的不稳定服务获取数据,但仍然会在本地缓存一份 可能 有些过期的数据, 因为缓存的读取更加可靠。可以这样来做:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k)) // 1
    .onErrorResume(e -> getFromCache(k)); // 2
1
2
3
  1. 对于每一个 key, 异步地调用一个外部服务
  2. 如果对外部服务的调用失败,则再去缓存中查找该 key。注意,这里无论 e 是什么,都会执行异常处理方法

就像 onErrorReturn,onErrorResume 也有可以用于预先过滤错误内容的方法变体,可以基于异常类或 Predicate 进行过滤。它实际上是用一个 Function 来作为参数,还可以返回一个新的流序列。

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(error -> { // 1
        if (error instanceof TimeoutException) // 2
            return getFromCache(k);
        else if (error instanceof UnknownKeyException) // 3
            return registerNewEntry(k, "DEFAULT");
        else
            return Flux.error(error); // 4
    });
1
2
3
4
5
6
7
8
9
10
  1. 这个函数式允许开发者自行决定如何处理
  2. 如果源超时,使用本地缓存
  3. 如果源找不到对应的 key,创建一个新的实体
  4. 否则, 将问题「重新抛出」

# 动态候补值

有时候并不想提供一个错误处理方法,而是想在接收到错误的时候计算一个候补的值。这类似于第 (3) 条(捕获并动态计算一个候补值)。

例如,如果你的返回类型本身就有可能包装有异常(比如 Future.complete(T success) vs Future.completeExceptionally(Throwable error)),你有可能使用流中的错误包装起来实例化 返回值。

这也可以使用上一种错误处理方法的方式(使用 onErrorResume)解决,代码如下:

erroringFlux.onErrorResume(error -> Mono.just( // 1
        myWrapper.fromError(error) // 2
));
1
2
3
  1. 在 onErrorResume 中,使用 Mono.just 创建一个 Mono
  2. 将异常包装到另一个类中

# 捕获并重新抛出

在「错误处理方法」的例子中,基于 flatMap 方法的最后一行,我们可以猜到如何做到第 (4) 条(捕获,包装到一个业务相关的异常,然后抛出业务异常):

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
        new BusinessException("oops, SLA exceeded", original)
    );
1
2
3
4
5

然而还有一个更加直接的方法—— onErrorMap:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
1
2
3

# 记录错误日志

如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用 doOnError 方法。这对应第 (5) 条(捕获,记录错误日志,并继续抛出)。 这个方法与其他以 doOn 开头的方法一样,只起副作用("side-effect")。它们对序列都是只读, 而不会带来任何改动。

如下边的例子所示,我们会记录错误日志,并且还通过变量自增统计错误发生个数。

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k)) // 1
    .doOnError(e -> {
        failureStat.increment();
        log("uh oh, falling back, service failed for key " + k); // 2
    })
    .onErrorResume(e -> getFromCache(k)); // 3
1
2
3
4
5
6
7
8
9
  1. 对外部服务的调用失败
  2. 记录错误日志
  3. 然后回调错误处理方法

# 使用资源和 try-catch 代码块

最后一个要与命令式编程对应的对比就是使用 Java 7 "try-with-resources" 或 finally 代码块清理资源。这是第 (6) 条(使用 finally 代码块清理资源或使用 Java 7 引入的 "try-with-resource")。在 Reactor 中都有对应的方法: using 和 doFinally:

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); // 4
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Flux<String> flux =
Flux.using(
        () -> disposableInstance, // 1
        disposable -> Flux.just(disposable.toString()), // 2
        Disposable::dispose // 3
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  1. 第一个 lambda 生成资源,这里我们返回模拟的(mock) Disposable
  2. 第二个 lambda 处理资源,返回一个 Flux<T>
  3. 第三个 lambda 在 2) 中的资源 Flux 终止或取消的时候,用于清理资源
  4. 在订阅或执行流序列之后, isDisposed 会置为 true

另一方面, doFinally 在序列终止(无论是 onComplete、onError还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消?)。

LongAdder statsCancel = new LongAdder(); // 1

Flux<String> flux =
Flux.just("foo", "bar")
    .doFinally(type -> {
        if (type == SignalType.CANCEL) // 2
          statsCancel.increment(); // 3
    })
    .take(1); // 4
1
2
3
4
5
6
7
8
9
  1. 我们想进行统计,所以用到了 LongAdder
  2. doFinally 用 SignalType 检查了终止信号的类型
  3. 如果只是取消,那么统计数据自增
  4. take(1) 能够在发出 1 个元素后取消流

# 演示终止方法 onError

为了演示当错误出现的时候如何导致上游序列终止,我们使用 Flux.interval 构造一个更加直观的例子。 这个 interval 操作符会在每 x 单位的时间发出一个自增的 Long 值。

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); // 1
1
2
3
4
5
6
7
8
9
10
  1. 注意 interval 默认基于一个 timer Scheduler 来执行。 如果我们想在 main 方法中运行, 我们需要调用 sleep,这样程序就可以在还没有产生任何值的时候就退出了

每 250ms 打印出一行信息,如下:

tick 0
tick 1
tick 2
Uh oh
1
2
3
4

即使多给了 1 秒钟时间,也没有更多的 tick 信号由 interval 产生了,所以序列确实被错误信号终止了。

# 重试

还有一个用于错误处理的操作符你可能会用到,就是 retry,见文知意,用它可以对出现错误的序列进行重试。

问题是它对于上游 Flux 是基于重订阅(re-subscribing)的方式。这实际上已经一个不同的序列了, 发出错误信号的序列仍然是终止了的。为了验证这一点,我们可以在继续用上边的例子,增加一个 retry(1) 代替 onErrorReturn 来重试一次。

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .elapsed() // 1
    .retry(1)
    .subscribe(System.out::println, System.err::println); // 2

Thread.sleep(2100); // 3
1
2
3
4
5
6
7
8
9
10
  1. elapsed 会关联从当前值与上个值发出的时间间隔(如下边输出的内容中的 259/249/251)
  2. 我们还是要看一下 onError 时的内容
  3. 确保我们有足够的时间可以进行 4x2 次 tick

输出如下:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 // 1
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1
2
3
4
5
6
7
  1. 一个新的 interval 从 tick 0 开始。多出来的 250ms 间隔来自于第 4 次 tick, 就是导致出现异常并执行 retry 的那次

可见, retry(1) 不过是再一次从新订阅了原始的 interval,从 tick 0 开始。第二次, 由于异常再次出现,便将异常传递到下游了。

还有一个「高配版」的 retry (retryWhen),它使用一个伴随("companion") Flux 来判断对某次错误是否要重试。这个伴随 Flux 是由操作符创建的,但是由开发者包装它, 从而实现对重试操作的配置。

这个伴随 Flux 是一个 Flux<Throwable>,它作为 retryWhen 的唯一参数被传递给一个 Function,你可以定义这个 Function 并让它返回一个新的 Publisher<?>。重试的循环 会这样运行:

  1. 每次出现错误,错误信号会发送给伴随 Flux,后者已经被你用 Function 包装。
  2. 如果伴随 Flux 发出元素,就会触发重试。
  3. 如果伴随 Flux 完成(complete),重试循环也会停止,并且原始序列也会 完成(complete)。
  4. 如果伴随 Flux 产生一个错误,重试循环停止,原始序列也停止 或 完成,并且这个错误会导致 原始序列失败并终止。

了解前两个场景的区别是很重要的。如果让伴随 Flux 完成(complete)等于吞掉了错误。如下代码用 retryWhen 模仿了 retry(3) 的效果:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) // 1
    .doOnError(System.out::println) // 2
    .retryWhen(companion -> companion.take(3)); // 3
1
2
3
4
  1. 持续产生错误
  2. 在 retry 之前 的 doOnError 可以让我们看到错误
  3. 这里,我们认为前 3 个错误是可以重试的(take(3)),再有错误就放弃

事实上,上边例子最终得到的是一个 空的 Flux,但是却 成功 完成了。反观对同一个 Flux 调用 retry(3) 的话,最终是以最后一个 error 终止 Flux,故而 retryWhen 与之不同。

实现同样的效果需要一些额外的技巧:

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), // 1
          (error, index) -> { // 2
            if (index < 4) return index; // 3
            else throw Exceptions.propagate(error); // 4
          })
    );
1
2
3
4
5
6
7
8
9
  1. 技巧一:使用 zip 和一个「重试个数 + 1」的 range
  2. zip 方法让你可以在对重试次数计数的同时,仍掌握着原始的错误(error)
  3. 允许三次重试,小于 4 的时候发出一个值
  4. 为了使序列以错误结束。我们将原始异常在三次重试之后抛出

类似的代码也可以被用于实现 exponential backoff and retry 模式 (重试指定的次数, 且每一次重试之间停顿的时间逐渐增加),参考 最佳实践。

# 在操作符或函数式中处理异常

总体来说,所有的操作符自身都可能包含触发异常的代码,或自定义的可能导致失败的代码, 所以它们都自带一些错误处理方式。

一般来说,一个 不受检异常(Unchecked Exception) 总是由 onError 传递。例如, 在一个 map 方法中抛出 RuntimeException 会被翻译为一个 onError 事件,如下:

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));
1
2
3
4

上边代码输出如下:

ERROR: java.lang.IllegalArgumentException: foo
1

Exception 可以在其被传递给 onError 之前,使用 内部错误 Hook 进行调整。

Reactor,定义了一系列的能够导致「严重失败」的错误(比如 OutOfMemoryError),也可参考 Exceptions.throwIfFatal 方法。这些错误意味着 Reactor 无力处理只能抛出,无法传递下去。

还有些情况下不受检异常仍然无法传递下去(多数处于subscribe 和 request 阶段), 因为可能由于多线程竞争导致两次 onError 或 onComplete 的情况。当这种竞争发生的时候, 无法传递下去的错误信号就被「丢弃」了。这些情况仍然可以通过自定义的 hook 来搞定,见 丢弃事件的 Hooks。

你可能会问:「那么 受检查异常(Checked Exceptions)?」

如果你需要调用一个声明为 throws 异常的方法,你仍然需要使用 try-catch 代码块处理异常。 有几种方式:

  1. 捕获异常,并修复它,流序列正常继续。
  2. 捕获异常,并把它包装(wrap)到一个 不受检异常 中,然后抛出(中断序列)。工具类 Exceptions 可用于这种方式(我们马上会讲到)。
  3. 如果你气我返回一个 Flux (例如在 flatMap 中),将异常包装在一个产生错误的 Flux中: return Flux.error(checkedException)(流序列也会终止)。

Reactor 有一个工具类 Exceptions,可以确保在收到受检异常的时候将其包装(wrap)起来。

  • 如果需要,可以使用 Exceptions.propagate 方法来包装异常,它同样会首先调用 throwIfFatal, 并且不会包装 RuntimeException。
  • 使用 Exceptions.unwrap 方法来得到原始的未包装的异常(追溯最初的异常)。

下面是一个 map 的例子,它使用的 convert 方法会抛出 IOException:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}
1
2
3
4
5
6

现在想象你将这个方法用于一个 map 中,你必须明确捕获这个异常,并且你的 map 方法不能再次抛出它。 所以你可以将其以 RuntimeException 的形式传递给 onError:

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });
1
2
3
4
5
6

当后边订阅上边的这个 Flux 并响应错误(比如在用户界面)的时候,如果你想处理 IOException, 你还可以再将其转换为原始的异常。如下:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);
1
2
3
4
5
6
7
8
9
10

# Processors

Processors 既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber)。 那意味着你可以 订阅一个 Processor(通常它们会实现 Flux),也可以调用相关方法来手动 插入数据到序列,或终止序列。

Processor 有多种类型,它们都有特别的语义规则,但是在你研究它们之前,最好问一下 自己如下几个问题:

# 我是否需要使用 Processor

多数情况下,你应该进行避免使用 Processor,它们较难正确使用,主要用于一些特殊场景下。

如果你觉得 Processor 适合你的使用场景,请首先看一下是否尝试过以下两种替代方式:

  1. 是否有一个或多个操作符的组合能够满足需求?(见 操作符总结)
  2. 可编程式地创建一个序列 操作符是否能解决问题?(通常这些操作符 可以用来桥接非响应式的 API,它们提供了一个「sink」,在生成数据流序列方面, 概念上类似于 Processor)

如果看了以上替代方案,你仍然觉得需要一个 Processor,阅读 现有的 Processors 总览 这一节来了解一下不同的实现吧。

# 使用 Sink 门面对象来线程安全地生成流

比起直接使用 Reactor 的 Processors,更好的方式是通过调用一次 sink() 来得到 Processor 的 Sink。

FluxProcessor 的 sink 是线程安全的「生产者(producer)」,因此能够在应用程序中 多线程并发地生成数据。例如,一个线程安全的序列化(serialized)的 sink 能够通过 UnicastProcessor 创建:

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);
1
2

多个生产者线程可以并发地生成数据到以下的序列化 sink。

sink.next(n);
1

根据 Processor 及其配置,next 产生的溢出有两种可能的处理方式:

  • 一个无限的 processor 通过丢弃或缓存自行处理溢出。
  • 一个有限的 processor 阻塞在 IGNORE 策略,或将 overflowStrategy 应用于 sink。

# 现有的 Processors 总览

Reactor Core 内置多种 Processor。这些 processor 具有不同的语法,大概分为三类。 下边简要介绍一下这三种 processor:

  • 直接的(direct) (DirectProcessor 和 UnicastProcessor):这些 processors 只能通过直接 调用 Sink 的方法来推送数据
  • 同步的(synchronous) (EmitterProcessor 和 ReplayProcessor):这些 processors 既可以 直接调用 Sink 方法来推送数据,也可以通过订阅到一个上游的发布者来同步地产生数据
  • 异步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):这些 processors 可以将从多个上游发布者得到的数据推送下去。由于使用了 RingBuffer 的数据结构来 缓存多个来自上游的数据,因此更加有健壮性

异步的 processor 在实例化的时候最复杂,因为有许多不同的选项。因此它们暴露出一个 Builder 接口。 而简单的 processors 有静态的工厂方法。

# DirectProcessor

DirectProcessor 可以将信号分发给零到多个订阅者(Subscriber)。它是最容易实例化的,使用静态方法 create() 即可。另一方面,它的不足是无法处理背压。所以,当 DirectProcessor 推送的是 N 个元素,而至少有一个订阅者的请求个数少于 N 的时候,就会发出一个 IllegalStateException。

一旦 Processor 终止(通常通过调用它的 Sink 的 error(Throwable) 或 complete() 方法), 虽然它允许更多的订阅者订阅它,但是会立即向它们重新发送终止信号。

# UnicastProcessor

UnicastProcessor 可以使用一个内置的缓存来处理背压。代价就是它最多只能有一个订阅者。

UnicastProcessor 有多种选项,因此提供多种不同的 create 静态方法。例如,它默认是 无限的(unbounded) :如果你在在订阅者还没有请求数据的情况下让它推送数据,它会缓存所有数据。

可以通过提供一个自定义的 Queue 的具体实现传递给 create 工厂方法来改变默认行为。如果给出的队列是 有限的(bounded), 并且缓存已满,而且未收到下游的请求,processor 会拒绝推送数据。

在上边 有限的 例子中,还可以在构造 processor 的时候提供一个回调方法,这个回调方法可以在每一个 被拒绝推送的元素上调用,从而让开发者有机会清理这些元素。

# EmitterProcessor

EmitterProcessor 能够向多个订阅者发送数据,并且可以对每一个订阅者进行背压处理。它本身也可以订阅一个 Publisher 并同步获得数据。

最初如果没有订阅者,它仍然允许推送一些数据到缓存,缓存大小由 bufferSize 定义。 之后如果仍然没有订阅者订阅它并消费数据,对 onNext 的调用会阻塞,直到有订阅者接入 (这时只能并发地订阅了)。

因此第一个订阅者会收到最多 bufferSize 个元素。然而之后, processor 不会重新发送(replay) 数据给后续的订阅者。这些后续接入的订阅者只能获取到它们开始订阅 之后 推送的数据。这个内部的 缓存会继续用于背压的目的。

默认情况下,如果所有的订阅者都取消了(基本意味着它们都不再订阅(un-subscribed)了), 它会清空内部缓存,并且不再接受更多的订阅者。这一点可以通过 create 静态工厂方法的 autoCancel 参数来配置。

# ReplayProcessor

ReplayProcessor 会缓存直接通过自身的 Sink 推送的元素,以及来自上游发布者的元素, 并且后来的订阅者也会收到重发(replay)的这些元素。

可以通过多种配置方式创建它:

  • 缓存一个元素(cacheLast)。
  • 缓存一定个数的历史元素(create(int)),所有的历史元素(create())。
  • 缓存基于时间窗期间内的元素(createTimeout(Duration))。
  • 缓存基于历史个数和时间窗的元素(createSizeOrTimeout(int, Duration))。

# TopicProcessor

TopicProcessor 是一个异步的 processor,它能够重发来自多个上游发布者的元素, 这需要在创建它的时候配置 shared (见 build() 的 share(boolean) 配置)。

注意,如果你企图在并发环境下通过并发的上游 Publisher 调用 TopicProcessor 的 onNext、 onComplete,或 onError 方法,就必须配置 shared。

否则,并发调用就是非法的,从而 processor 是完全兼容响应式流规范的。

TopicProcessor 能够对多个订阅者发送数据。它通过对每一个订阅者关联一个线程来实现这一点, 这个线程会一直执行直到 processor 发出 onError 或 onComplete 信号,或关联的订阅者被取消。 最多可以接受的订阅者个数由构造者方法 executor 指定,通过提供一个有限线程数的 ExecutorService 来限制这一个数。

这个 processor 基于一个 RingBuffer 数据结构来存储已发送的数据。每一个订阅者线程 自行管理其相关的数据在 RingBuffer 中的索引。

这个 processor 也有一个 autoCancel 构造器方法:如果设置为 true (默认的),那么当 所有的订阅者取消之后,源 Publisher(s) 也就被取消了。

# WorkQueueProcessor

WorkQueueProcessor 也是一个异步的 processor,也能够重发来自多个上游发布者的元素, 同样在创建时需要配置 shared (它多数构造器配置与 TopicProcessor 相同)。

它放松了对响应式流规范的兼容,但是好处就在于相对于 TopicProcessor 来说需要更少的资源。 它仍然基于 RingBuffer,但是不再要求每一个订阅者都关联一个线程,因此相对于 TopicProcessor 来说更具扩展性。

代价在于分发模式有些区别:来自订阅者的请求会汇总在一起,并且这个 processor 每次只对一个 订阅者发送数据,因此需要循环(round-robin)对订阅者发送数据,而不是一次全部发出的模式。

无法保证完全公平的循环分发。

WorkQueueProcessor 多数构造器方法与 TopicProcessor 相同,比如 autoCancel、share, 以及 waitStrategy。下游订阅者的最大数目同样由构造器 executor 配置的 ExecutorService 决定。

你最好注意不要有太多订阅者订阅 WorkQueueProcessor,因为这 会锁住 processor。 如果你需要限制订阅者数量,最好使用一个 ThreadPoolExecutor 或 ForkJoinPool。这个 processor 能够检测到(线程池)容量并在订阅者过多时抛出异常。

编辑此页 (opens new window)
#Reactor
更新时间: 2024/11/02, 09:43:06
Reactor - 响应式编程
Reactor - 对 Kotlin 的支持

← Reactor - 响应式编程 Reactor - 对 Kotlin 的支持→

最近更新
01
技术随笔 - Element Plus 修改包名 原创
11-02
02
Reactor - 扩展性
11-02
03
Reactor - 最佳实践
11-02
更多文章>
Theme by Vdoing | Copyright © 2021-2024 Young Kbt | blog
桂ICP备2021009994号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式