Young Kbt blog Young Kbt blog
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)

Shp Liu

朝圣的使徒,正在走向编程的至高殿堂!
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)
  • MyBatis

  • MyBatis-Plus

  • 中间件 - ActiveMQ

  • 中间件 - RabbitMQ

  • 中间件 - RocketMQ

  • 中间件 - Kafka

  • 高性能服务器 - Nginx

  • 响应式框架 - Reactor

    • Reactor - 快速上手
    • Reactor - 响应式编程
    • Reactor - 核心特征
    • Reactor - 对 Kotlin 的支持
    • Reactor - 单元测试
    • Reactor - 调试 Reactor
    • Reactor - 高级特性与概念
    • Reactor - 操作符总结
    • Reactor - 最佳实践
      • 如何包装一个同步阻塞的调用
      • 用在 Flux 上的操作符好像没起作用,为啥
      • Mono zipWith/zipWhen 没有被调用
      • 如何用 retryWhen 来实现 retry(3) 的效果
      • 如何使用 retryWhen 进行 exponential backoff
      • 如何使用 publishOn() 确保线程关联性
    • Reactor - 扩展性
  • 框架
  • 响应式框架 - Reactor
Young Kbt
2024-11-02
目录

Reactor - 最佳实践

  • 如何包装一个同步阻塞的调用
  • 用在 Flux 上的操作符好像没起作用,为啥
  • Mono zipWith/zipWhen 没有被调用
  • 如何用 retryWhen 来实现 retry(3) 的效果
  • 如何使用 retryWhen 进行 exponential backoff
  • 如何使用 publishOn() 确保线程关联性

# 如何包装一个同步阻塞的调用

很多时候,信息源是同步和阻塞的。在 Reactor 中,我们用以下方式处理这种信息源:

Mono blockingWrapper = Mono.fromCallable(() -> { // 1
    return /* make a remote synchronous call */ // 2
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); // 3
1
2
3
4
  1. 使用 fromCallable 方法生成一个 Mono
  2. 返回同步、阻塞的资源
  3. 使用 Schedulers.elastic() 确保对每一个订阅来说运行在一个专门的线程上

因为调用返回一个值,所以你应该使用 Mono。你应该使用 Schedulers.elastic 因为它会创建一个专门的线程来等待阻塞的调用返回。

注意 subscribeOn 方法并不会「订阅」这个 Mono。它只是指定了订阅操作使用哪个 Scheduler。

# 用在 Flux 上的操作符好像没起作用,为啥

请确认你确实对调用 .subscribe() 的发布者应用了这个操作符。

Reactor 的操作符是装饰器(decorators)。它们会返回一个不同的(发布者)实例, 这个实例对上游序列进行了包装并增加了一些的处理行为。所以,最推荐的方式是将操作符「串」起来。

对比下边的两个例子:

没有串起来(不正确的)

Flux<String> flux = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); // 1
flux.subscribe(next -> System.out.println("Received: " + next));
1
2
3
  1. 问题在这, flux 变量并没有改变

串起来(正确的)

Flux<String> flux = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));
1
2
3

下边的例子更好(因为更简洁):

串起来(最好的)

Flux<String> secrets = Flux
  .just("foo", "chain")
  .map(secret -> secret.replaceAll(".", "*"))
  .subscribe(next -> System.out.println("Received: " + next));
1
2
3
4

第一个例子的输出:

Received: foo
Received: chain
1
2

后两个例子的输出:

Received: ***
Received: *****
1
2

# Mono zipWith/zipWhen 没有被调用

例子

myMethod.process("a") // 这个方法返回 Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //没有被调用
        .subscribe();
1
2
3

如果源 Mono 为空或是一个 Mono<Void>(Mono<Void> 通常用于「空」的场景), 下边的组合操作就不会被调用。

对于类似 zipWith 的用于转换的操作符来说,这是比较典型的场景。 这些操作符依赖于数据元素来转换为输出的元素。 如果任何一个序列是空的,则返回的就是一个空序列,所以请谨慎使用。 例如在 then() 之后使用 zipWith() 就会导致这一问题。

对于以 Function 作为参数的 and 更是如此,因为返回的 Mono 是依赖于收到的数据懒加载的(而对于空序列或 Void 的序列来说是没有数据发出来的)。

你可以使用 .defaultIfEmpty(T) 将空序列替换为包含 T 类型缺省值的序列(而不是 Void 序列), 从而可以避免类似的情况出现。举例如下:

在 zipWhen 前使用 defaultIfEmpty

myMethod.emptySequenceForKey("a") // 这个方法返回一个空的 Mono<String>
        .defaultIfEmpty("") // 将空序列转换为包含字符串 "" 的序列
        .zipWhen(aString -> myMethod.process("b")) // 当 "" 发出时被调用
        .subscribe();
1
2
3
4

# 如何用 retryWhen 来实现 retry(3) 的效果

retryWhen 方法比较复杂,希望下边的一段模拟 retry(3) 的代码能够帮你更好地理解它的工作方式:

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), // 1
          (error, index) -> { // 2
            if (index < 4) return index; // 3
            else throw Exceptions.propagate(error); // 4
          })
    );
1
2
3
4
5
6
7
8
9
  1. 技巧一:使用 zip 和一个「重试个数 + 1」的 range。
  2. zip 方法让你可以在对重试次数计数的同时,仍掌握着原始的错误(error)。
  3. 允许三次重试,小于 4 的时候发出一个值。
  4. 为了使序列以错误结束。我们将原始异常在三次重试之后抛出。

# 如何使用 retryWhen 进行 exponential backoff

Exponential backoff 的意思是进行的多次重试之间的间隔越来越长, 从而避免对源系统造成过载,甚至宕机。基本原理是,如果源产生了一个错误, 那么已经是处于不稳定状态,可能不会立刻复原。所以,如果立刻就重试可能会产生另一个错误, 导致源更加不稳定。

下面是一段实现 exponential backoff 效果的例子,每次重试的间隔都会递增 (伪代码: delay = attempt number * 100 milliseconds):

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
        .doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) // 1
        .zipWith(Flux.range(1, 4), (error, index) -> { // 2
          if (index < 4) return index;
          else throw Exceptions.propagate(error);
        })
        .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) // 3
        .doOnNext(s -> System.out.println("retried at " + LocalTime.now())) // 4
    );
1
2
3
4
5
6
7
8
9
10
11
  1. 记录错误出现的时间
  2. 使用 retryWhen + zipWith 的技巧实现重试3次的效果
  3. 通过 flatMap 来实现延迟时间递增的效果
  4. 同样记录重试的时间

订阅它,输出如下:

java.lang.IllegalArgumentException at 18:02:29.338
retried at 18:02:29.459 // 1
java.lang.IllegalArgumentException at 18:02:29.460
retried at 18:02:29.663 // 2
java.lang.IllegalArgumentException at 18:02:29.663
retried at 18:02:29.964 // 3
java.lang.IllegalArgumentException at 18:02:29.964
1
2
3
4
5
6
7
  1. 第一次重试延迟大约 100ms
  2. 第二次重试延迟大约 200ms
  3. 第三次重试延迟大约 300ms

# 如何使用 publishOn() 确保线程关联性

如 调度器(Schedulers) 所述,publishOn() 可以用来切换执行线程。 publishOn 能够影响到其之后的操作符的执行线程,直到有新的 publishOn 出现。 所以 publishOn 的位置很重要。

比如下边的例子, map() 中的 transform 方法是在 scheduler1 的一个工作线程上执行的, 而 doOnNext() 中的 processNext 方法是在 scheduler2 的一个工作线程上执行的。 单线程的调度器可能用于对不同阶段的任务或不同的订阅者确保线程关联性。

EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
         .map(i -> transform(i))
         .publishOn(scheduler2)
         .doOnNext(i -> processNext(i))
         .subscribe();
1
2
3
4
5
6
编辑此页 (opens new window)
#Reactor
更新时间: 2024/11/02, 09:43:06
Reactor - 操作符总结
Reactor - 扩展性

← Reactor - 操作符总结 Reactor - 扩展性→

最近更新
01
技术随笔 - Element Plus 修改包名 原创
11-02
02
Reactor - 扩展性
11-02
03
Reactor - 操作符总结
11-02
更多文章>
Theme by Vdoing | Copyright © 2021-2024 Young Kbt | blog
桂ICP备2021009994号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式