Young Kbt blog Young Kbt blog
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)

Shp Liu

朝圣的使徒,正在走向编程的至高殿堂!
首页
  • java基础

    • Java基础
    • Java集合
    • Java反射
    • JavaJUC
    • JavaJVM
  • Java容器

    • JavaWeb
  • Java版本新特性

    • Java新特性
  • SQL 数据库

    • MySQL
    • Oracle
  • NoSQL 数据库

    • Redis
    • ElasticSearch
  • 数据库

    • MyBatis
    • MyBatis-Plus
  • 消息中间件

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
  • 进阶服务

    • Nginx
  • Spring
  • Spring Boot
  • Spring Security
  • 设计模式
  • 算法
  • 知识
  • 管理

    • Maven
    • Git
  • 部署

    • Linux
    • Docker
    • Jenkins
    • Kubernetes
  • 进阶

    • TypeScript
  • 框架

    • React
    • Vue2
    • Vue3
  • 轮子工具
  • 项目工程
  • 友情链接
  • 本站

    • 分类
    • 标签
    • 归档
  • 我的

    • 收藏
    • 关于
    • Vue2-Admin (opens new window)
    • Vue3-Admin(完善) (opens new window)
GitHub (opens new window)
  • MyBatis

  • MyBatis-Plus

  • 中间件 - ActiveMQ

  • 中间件 - RabbitMQ

  • 中间件 - RocketMQ

  • 中间件 - Kafka

  • 高性能服务器 - Nginx

  • 响应式框架 - Reactor

    • Reactor - 快速上手
    • Reactor - 响应式编程
    • Reactor - 核心特征
    • Reactor - 对 Kotlin 的支持
    • Reactor - 单元测试
      • 测试
      • 使用 StepVerifier 来测试
      • 操控时间
      • 使用 StepVerifier 进行后校验
      • 测试 Context
      • 用 TestPublisher 手动发出元素
      • 用 PublisherProbe 检查执行路径
    • Reactor - 调试 Reactor
    • Reactor - 高级特性与概念
    • Reactor - 操作符总结
    • Reactor - 最佳实践
    • Reactor - 扩展性
  • 框架
  • 响应式框架 - Reactor
Young Kbt
2024-11-02
目录

Reactor - 单元测试

  • 测试
  • 使用 StepVerifier 来测试
  • 操控时间
  • 使用 StepVerifier 进行后校验
  • 测试 Context
  • 用 TestPublisher 手动发出元素
  • 用 PublisherProbe 检查执行路径

# 测试

无论你是编写了一个简单的 Reactor 操作链,还是开发了自定义的操作符,对它进行 自动化的测试总是一个好主意。

Reactor 内置一些专门用于测试的元素,放在一个专门的 artifact 里: reactor-test。 你可以在 on Github (opens new window) 的 reactor-core 库里找到这个项目。

如果要用它来进行测试,添加 scope 为 test 的依赖。

reactor-test 用 Maven 配置 <dependencies>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    // 1
</dependency>
1
2
3
4
5
6
  1. 如果你使用了 BOM,你不需要指定 <version>。

reactor-test 用 Gradle 配置 dependencies

dependencies {
   testCompile 'io.projectreactor:reactor-test'
}
1
2
3

reactor-test 的两个主要用途:

  • 使用 StepVerifier 一步一步地测试一个给定场景的序列
  • 使用 TestPublisher 生成数据来测试下游的操作符(包括你自己的operator)

# 使用 StepVerifier 来测试

最常见的测试 Reactor 序列的场景就是定义一个 Flux 或 Mono,然后在订阅它的时候测试它的行为。

当你的测试关注于每一次的事件的时候,就非常容易转化为使用 StepVerifier 的测试场景: 下一个期望的事件是什么?你是否期望使用 Flux 来发出一个特别的值?或者接下来 300ms 什么都不做?所有这些都可以使用 StepVerifier API 来表示。

例如,你可能会使用如下的工具方法来包装一个 Flux:

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}
1
2
3

要测试它的话,你需要校验如下内容:

我希望这个 Flux 先发出 foo,然后发出 bar,然后 生成一个内容为 boom 的错误。 最后订阅并校验它们。

使用 StepVerifier API 来表示以上的验证过程:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("foo", "bar"); // 1

  StepVerifier.create( // 2
    appendBoomError(source)) // 3
    .expectNext("foo") // 4
    .expectNext("bar")
    .expectErrorMessage("boom") // 5
    .verify(); // 6
}
1
2
3
4
5
6
7
8
9
10
11
  1. 由于被测试方法需要一个 Flux,定义一个简单的 Flux 用于测试
  2. 创建一个 StepVerifier 构造器来包装和校验一个 Flux
  3. 传进来需要测试的 Flux(即待测方法的返回结果)
  4. 第一个我们期望的信号是 onNext,它的值为 foo
  5. 最后我们期望的是一个终止信号 onError,异常内容应该为 boom
  6. 不要忘了使用 verify() 触发测试

API 是一个构造器,通过传入一个要测试的序列来创建一个 StepVerifier。从而你可以:

  • 表示你 期望 发生的下一个信号。如果收到其他信号(或者信号与期望不匹配),整个测试就会 失败(AssertionError)。例如你可能会用到 expectNext(T...) 或 expectNextCount(long)
  • 消费 下一个信号。当你想要跳过部分序列或者当你想对信号内容进行自定义的 assertion 的时候会用到它(比如要校验是否有一个 onNext 信号,并校验对应发出的元素是否是一个 size 为 5 的 List)。你可能会用到 consumeNextWith(Consumer<T>)
  • 更多样的操作 比如暂停或运行一段代码。比如,你想对测试状态或内容进行调整或处理, 你可能会用到 thenAwait(Duration) 和 then(Runnable)

对于终止事件,相应的期望方法(expectComplete()、expectError(),及其所有的变体方法) 使用之后就不能再继续增加别的期望方法了。最后你只能对 StepVerifier 进行一些额外的配置并 触发校验(通常调用 verify() 及其变体方法)。

从 StepVerifier 内部来看,它订阅了待测试的 Flux 或 Mono,然后将序列中的每个信号与测试 场景的期望进行比对。如果匹配的话,测试成功。如果有不匹配的情况,则抛出 AssertionError 异常。

请记住是 verify() 触发了校验过程。这个 API 还有一些结合了 verify() 与期望的终止信号 的方法:verifyComplete()、verifyError()、verifyErrorMessage(String) 等。

注意,如果有一个传入 lambda 的期望方法抛出了 AssertionError,会被报告为测试失败。 这可用于自定义 assertion。

默认情况下,verify() 方法(及同源的 verifyThenAssertThat、verifyComplete()等) 没有超时的概念。它可能会永远阻塞住。你可以使用 StepVerifier.setDefaultTimeout(Duration) 来设置一个全局的超时时间,或使用 verify(Duration) 指定。

# 操控时间

StepVerifier 可以用来测试基于时间的操作符,从而避免测试的长时间运行。可以使用构造器 StepVerifier.withVirtualTime 达到这一点。

示例如下:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... 继续追加期望方法
1
2

虚拟时间(virtual time) 的功能会在 Reactor 的调度器(Scheduler)工厂方法中插入一个自定义的 调度器。这些基于时间的操作符通常默认使用 Schedulers.parallel() 调度器。(虚拟时间的) 技巧在于使用一个 VirtualTimeScheduler 来代替默认调度器。然而一个重要的前提就是,只有在初始化 虚拟时间调度器之后的操作符才会起作用。

为了提高 StepVerifier 正常起作用的概率,它一般不接收一个简单的 Flux 作为输入,而是接收 一个 Supplier,从而可以在配置好订阅者 之后 「懒创建」待测试的 flux。

要注意的是,Supplier<Publisher<T>> 可用于「懒创建」,否则不能保证虚拟时间 能完全起作用。尤其要避免提前实例化 Flux,要在 Supplier 中用 lambda 创建并返回 Flux 变量。

有两种处理时间的期望方法,无论是否配置虚拟时间都是可用的:

  • thenAwait(Duration) 暂停校验步骤(允许信号延迟发出)
  • expectNoEvent(Duration) 同样让序列持续一定的时间,期间如果有 任何 信号发出则测试失败

两个方法都会基于给定的持续时间暂停线程的执行,如果是在虚拟时间模式下就相应地使用虚拟时间。

expectNoEvent 将订阅(subscription)也认作一个事件。假设你用它作为第一步,如果检测 到有订阅信号,也会失败。这时候可以使用 expectSubscription().expectNoEvent(duration) 来代替。

为了快速校验前边提到的 Mono.delay,我们可以这样完成代码:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription()
    .expectNoEvent(Duration.ofDays(1)) // 2
    .expectNext(0) // 3
    .verifyComplete(); // 4
1
2
3
4
5
  1. 期待一天内没有信号发生
  2. 然后期待一个 next 信号为 0
  3. 然后期待完成(同时触发校验)

我们也可以使用 thenAwait(Duration.ofDays(1)),但是 expectNoEvent 的好处是 能够验证在此之前不会发生什么。

注意 verify() 返回一个 Duration,这是整个测试的 真实时间。

虚拟时间并非银弹。请记住 所有的 调度器都会被替换为 VirtualTimeScheduler。 有些时候你可以锁定校验过程,因为虚拟时钟在遇到第一个期望校验之前并不会开始,所以对于 「无数据「的期望校验也必须能够运行在虚拟时间模式下。在无限序列中,虚拟时间模式的发挥 空间也很有限,因为它可能导致线程(序列的发出和校验的运行都在这个线程上)卡住。

# 使用 StepVerifier 进行后校验

当配置完你测试场景的最后的期望方法后,你可以使用 verifyThenAssertThat() 来代替 verify() 触发执行后的校验。

verifyThenAssertThat() 返回一个 StepVerifier.Assertions 对象,你可以用它来校验 整个测试场景成功刚结束后的一些状态(它也会调用 verify())。典型应用就是校验有多少 元素被操作符丢弃(参考 使用全局的 Hooks)。

# 测试 Context

更多关于 Context 的内容请参考 增加一个 Context 到响应式序列。

StepVerifier 有一些期望方法可以用来测试 Context:

  • expectAccessibleContext: 返回一个 ContextExpectations 对象,你可以用它来在 Context 上配置期望校验。一定记住要调用 then() 来返回到对序列的期望校验上来
  • expectNoAccessibleContext: 是对「没有Context」的校验。通常用于 被测试的 Publisher 并不是一个响应式的,或没有任何操作符能够传递 Context (比如一个 generate 的 Publisher)

此外,还可以用 StepVerifierOptions 方法传入一个测试用的初始 Context 给 StepVerifier, 从而可以创建一个校验(verifier)。

这些特性通过下边的代码演示:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
                                StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) // 1
                            .expectAccessibleContext() // 2
                            .contains("foo", "bar") // 3
                            .then() // 4
                            .expectNext(11)
                            .verifyComplete(); // 5
1
2
3
4
5
6
7
  1. 使用 StepVerifierOptions 创建 StepVerifier 并传入初始 Context
  2. 开始对 Context 进行校验,这里只是确保 Context 正常传播了
  3. 对 Context 进行校验的例子:比如验证是否包含一个 "foo" - "bar" 键值对
  4. 使用 then() 切换回对序列的校验
  5. 不要忘了用 verify() 触发整个校验过程

# 用 TestPublisher 手动发出元素

对于更多高级的测试,如果能够完全掌控源发出的数据就会方便很多,因为这样就可以在测试的 时候更加有的放矢地发出想测的数据。

另一种情况就是你实现了自己的操作符,然后想校验它的行为——尤其是在源不稳定的时候——是否符合响应式流规范。

reactor-test 提供了 TestPublisher 类来应对这两种需求。这个类本质上是一个 Publisher<T>, 你可以通过可编程的方式来用它发出各种信号:

  • next(T) 以及 next(T, T...) 发出 1-n 个 onNext 信号
  • emit(T...) 起同样作用,并且会执行 complete()
  • complete() 会发出终止信号 onComplete
  • error(Throwable) 会发出终止信号 onError

使用 create 工厂方法就可以得到一个正常的 TestPublisher。而使用 createNonCompliant 工厂方法可以创建一个「不正常」的 TestPublisher。后者需要传入由 TestPublisher.Violation 枚举指定的一组选项,这些选项可用于告诉 publisher 忽略哪些问题。枚举值有:

  • REQUEST_OVERFLOW: 允许 next 在请求不足的时候也可以调用,而不会触发 IllegalStateException
  • ALLOW_NULL: 允许 next 能够发出一个 null 值而不会触发 NullPointerException
  • CLEANUP_ON_TERMINATE: 可以重复多次发出终止信号,包括 complete()、error() 和 emit()

最后,TestPublisher 还可以用不同的 assert* 来跟踪其内部的订阅状态。

使用转换方法 flux() 和 mono() 可以将其作为 Flux 和 Mono 来使用。

# 用 PublisherProbe 检查执行路径

当构建复杂的操作链时,可能会有多个子序列,从而导致多个执行路径。

多数时候,这些子序列会生成一个足够明确的 onNext 信号,你可以通过检查最终结果 来判断它是否执行。

考虑下边这个方法,它构建了一条操作链,并使用 switchIfEmpty 方法在源为空的情况下, 替换成另一个源。

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}
1
2
3
4
5

很容易就可以测试出 switchIfEmpty 的哪一个逻辑分支被使用了,如下:

@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

但是如果例子中的方法返回的是一个 Mono<Void> 呢?它等待源发送结束,执行一个额外的任务, 然后就结束了。如果源是空的,则执行另一个备用的类似于 Runnable 的任务,如下:

private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) // 1
            .switchIfEmpty(doWhenEmpty); // 2
}
1
2
3
4
5
6
7
8
9
  1. then() 方法会忽略 command,它只关心是否结束
  2. 两个都是空序列,这个时候如何区分(哪边执行了)呢?

为了验证执行路径是经过了 doWhenEmpty 的,你需要编写额外的代码,比如你需要一个这样的 Mono<Void>:

  • 能够捕获到它被订阅的事实
  • 以上事实需要在整个执行结束 之后 再进行验证

在 3.1 版本以前,你需要为每一种状态维护一个 AtomicBoolean 变量,然后在相应的 doOn* 回调中观察它的值。这需要添加不少的额外代码。好在,版本 3.1.0 之后可以使用 PublisherProbe 来做, 如下:

@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); // 1

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) // 2
                .verifyComplete();

    probe.assertWasSubscribed(); // 3
    probe.assertWasRequested(); // 4
    probe.assertWasNotCancelled(); // 5
}
1
2
3
4
5
6
7
8
9
10
11
  1. 创建一个探针(probe),它会转化为一个空序列
  2. 在需要使用 Mono<Void> 的位置调用 probe.mono() 来替换为探针
  3. 序列结束之后,你可以用这个探针来判断序列是如何使用的,你可以检查是它从哪(条路径)被订阅的
  4. 对于请求也是一样的
  5. 以及是否被取消了

你也可以在使用 Flux<T> 的位置通过调用 .flux() 方法来放置探针。如果你既需要用探针检查执行路径 还需要它能够发出数据,你可以用 PublisherProbe.of(Publisher) 方法包装一个 Publisher<T> 来搞定

编辑此页 (opens new window)
#Reactor
更新时间: 2024/11/02, 09:43:06
Reactor - 对 Kotlin 的支持
Reactor - 调试 Reactor

← Reactor - 对 Kotlin 的支持 Reactor - 调试 Reactor→

最近更新
01
技术随笔 - Element Plus 修改包名 原创
11-02
02
Reactor - 扩展性
11-02
03
Reactor - 最佳实践
11-02
更多文章>
Theme by Vdoing | Copyright © 2021-2024 Young Kbt | blog
桂ICP备2021009994号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式