 Reactor - 单元测试
Reactor - 单元测试
  # 测试
无论你是编写了一个简单的 Reactor 操作链,还是开发了自定义的操作符,对它进行 自动化的测试总是一个好主意。
Reactor 内置一些专门用于测试的元素,放在一个专门的 artifact 里: reactor-test。 你可以在 on Github (opens new window) 的 reactor-core 库里找到这个项目。
如果要用它来进行测试,添加 scope 为 test 的依赖。
reactor-test 用 Maven 配置 <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    // 1
</dependency>
2
3
4
5
6
- 如果你使用了 BOM,你不需要指定 <version>。
reactor-test 用 Gradle 配置 dependencies
dependencies {
   testCompile 'io.projectreactor:reactor-test'
}
2
3
reactor-test 的两个主要用途:
- 使用 StepVerifier一步一步地测试一个给定场景的序列
- 使用 TestPublisher生成数据来测试下游的操作符(包括你自己的operator)
# 使用 StepVerifier 来测试
 最常见的测试 Reactor 序列的场景就是定义一个 Flux 或 Mono,然后在订阅它的时候测试它的行为。
当你的测试关注于每一次的事件的时候,就非常容易转化为使用 StepVerifier 的测试场景: 下一个期望的事件是什么?你是否期望使用 Flux 来发出一个特别的值?或者接下来 300ms 什么都不做?所有这些都可以使用 StepVerifier API 来表示。
例如,你可能会使用如下的工具方法来包装一个 Flux:
public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}
2
3
要测试它的话,你需要校验如下内容:
我希望这个
Flux先发出foo,然后发出bar,然后 生成一个内容为boom的错误。 最后订阅并校验它们。
使用 StepVerifier API 来表示以上的验证过程:
@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("foo", "bar"); // 1
  StepVerifier.create( // 2
    appendBoomError(source)) // 3
    .expectNext("foo") // 4
    .expectNext("bar")
    .expectErrorMessage("boom") // 5
    .verify(); // 6
}
2
3
4
5
6
7
8
9
10
11
- 由于被测试方法需要一个 Flux,定义一个简单的Flux用于测试
- 创建一个 StepVerifier构造器来包装和校验一个Flux
- 传进来需要测试的 Flux(即待测方法的返回结果)
- 第一个我们期望的信号是 onNext,它的值为foo
- 最后我们期望的是一个终止信号 onError,异常内容应该为boom
- 不要忘了使用 verify()触发测试
API 是一个构造器,通过传入一个要测试的序列来创建一个 StepVerifier。从而你可以:
- 表示你 期望 发生的下一个信号。如果收到其他信号(或者信号与期望不匹配),整个测试就会 失败(AssertionError)。例如你可能会用到expectNext(T...)或expectNextCount(long)
- 消费 下一个信号。当你想要跳过部分序列或者当你想对信号内容进行自定义的 assertion的时候会用到它(比如要校验是否有一个onNext信号,并校验对应发出的元素是否是一个 size 为 5 的 List)。你可能会用到consumeNextWith(Consumer<T>)
- 更多样的操作 比如暂停或运行一段代码。比如,你想对测试状态或内容进行调整或处理, 你可能会用到 thenAwait(Duration)和then(Runnable)
对于终止事件,相应的期望方法(expectComplete()、expectError(),及其所有的变体方法) 使用之后就不能再继续增加别的期望方法了。最后你只能对 StepVerifier 进行一些额外的配置并 触发校验(通常调用 verify() 及其变体方法)。
从 StepVerifier 内部来看,它订阅了待测试的 Flux 或 Mono,然后将序列中的每个信号与测试 场景的期望进行比对。如果匹配的话,测试成功。如果有不匹配的情况,则抛出 AssertionError 异常。
请记住是
verify()触发了校验过程。这个 API 还有一些结合了verify()与期望的终止信号 的方法:verifyComplete()、verifyError()、verifyErrorMessage(String)等。
注意,如果有一个传入 lambda 的期望方法抛出了 AssertionError,会被报告为测试失败。 这可用于自定义 assertion。
默认情况下,
verify()方法(及同源的verifyThenAssertThat、verifyComplete()等) 没有超时的概念。它可能会永远阻塞住。你可以使用StepVerifier.setDefaultTimeout(Duration)来设置一个全局的超时时间,或使用verify(Duration)指定。
# 操控时间
StepVerifier 可以用来测试基于时间的操作符,从而避免测试的长时间运行。可以使用构造器 StepVerifier.withVirtualTime 达到这一点。
示例如下:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... 继续追加期望方法
2
虚拟时间(virtual time) 的功能会在 Reactor 的调度器(Scheduler)工厂方法中插入一个自定义的 调度器。这些基于时间的操作符通常默认使用 Schedulers.parallel() 调度器。(虚拟时间的) 技巧在于使用一个 VirtualTimeScheduler 来代替默认调度器。然而一个重要的前提就是,只有在初始化 虚拟时间调度器之后的操作符才会起作用。
为了提高 StepVerifier 正常起作用的概率,它一般不接收一个简单的 Flux 作为输入,而是接收 一个 Supplier,从而可以在配置好订阅者 之后 「懒创建」待测试的 flux。
要注意的是,
Supplier<Publisher<T>>可用于「懒创建」,否则不能保证虚拟时间 能完全起作用。尤其要避免提前实例化Flux,要在Supplier中用 lambda 创建并返回Flux变量。
有两种处理时间的期望方法,无论是否配置虚拟时间都是可用的:
- thenAwait(Duration)暂停校验步骤(允许信号延迟发出)
- expectNoEvent(Duration)同样让序列持续一定的时间,期间如果有 任何 信号发出则测试失败
两个方法都会基于给定的持续时间暂停线程的执行,如果是在虚拟时间模式下就相应地使用虚拟时间。
expectNoEvent将订阅(subscription)也认作一个事件。假设你用它作为第一步,如果检测 到有订阅信号,也会失败。这时候可以使用expectSubscription().expectNoEvent(duration)来代替。
为了快速校验前边提到的 Mono.delay,我们可以这样完成代码:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription()
    .expectNoEvent(Duration.ofDays(1)) // 2
    .expectNext(0) // 3
    .verifyComplete(); // 4
2
3
4
5
- 期待一天内没有信号发生
- 然后期待一个 next 信号为 0
- 然后期待完成(同时触发校验)
我们也可以使用 thenAwait(Duration.ofDays(1)),但是 expectNoEvent 的好处是 能够验证在此之前不会发生什么。
注意 verify() 返回一个 Duration,这是整个测试的 真实时间。
虚拟时间并非银弹。请记住 所有的 调度器都会被替换为
VirtualTimeScheduler。 有些时候你可以锁定校验过程,因为虚拟时钟在遇到第一个期望校验之前并不会开始,所以对于 「无数据「的期望校验也必须能够运行在虚拟时间模式下。在无限序列中,虚拟时间模式的发挥 空间也很有限,因为它可能导致线程(序列的发出和校验的运行都在这个线程上)卡住。
# 使用 StepVerifier 进行后校验
 当配置完你测试场景的最后的期望方法后,你可以使用 verifyThenAssertThat() 来代替 verify() 触发执行后的校验。
verifyThenAssertThat() 返回一个 StepVerifier.Assertions 对象,你可以用它来校验 整个测试场景成功刚结束后的一些状态(它也会调用 verify())。典型应用就是校验有多少 元素被操作符丢弃(参考 使用全局的 Hooks)。
# 测试 Context
 更多关于 Context 的内容请参考 增加一个 Context 到响应式序列。
StepVerifier 有一些期望方法可以用来测试 Context:
- expectAccessibleContext: 返回一个- ContextExpectations对象,你可以用它来在- Context上配置期望校验。一定记住要调用- then()来返回到对序列的期望校验上来
- expectNoAccessibleContext: 是对「没有- Context」的校验。通常用于 被测试的- Publisher并不是一个响应式的,或没有任何操作符能够传递- Context(比如一个- generate的- Publisher)
此外,还可以用 StepVerifierOptions 方法传入一个测试用的初始 Context 给 StepVerifier, 从而可以创建一个校验(verifier)。
这些特性通过下边的代码演示:
StepVerifier.create(Mono.just(1).map(i -> i + 10),
                                StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) // 1
                            .expectAccessibleContext() // 2
                            .contains("foo", "bar") // 3
                            .then() // 4
                            .expectNext(11)
                            .verifyComplete(); // 5
2
3
4
5
6
7
- 使用 StepVerifierOptions创建StepVerifier并传入初始Context
- 开始对 Context进行校验,这里只是确保Context正常传播了
- 对 Context进行校验的例子:比如验证是否包含一个 "foo" - "bar" 键值对
- 使用 then()切换回对序列的校验
- 不要忘了用 verify()触发整个校验过程
# 用 TestPublisher 手动发出元素
 对于更多高级的测试,如果能够完全掌控源发出的数据就会方便很多,因为这样就可以在测试的 时候更加有的放矢地发出想测的数据。
另一种情况就是你实现了自己的操作符,然后想校验它的行为——尤其是在源不稳定的时候——是否符合响应式流规范。
reactor-test 提供了 TestPublisher 类来应对这两种需求。这个类本质上是一个 Publisher<T>, 你可以通过可编程的方式来用它发出各种信号:
- next(T)以及- next(T, T...)发出 1-n 个- onNext信号
- emit(T...)起同样作用,并且会执行- complete()
- complete()会发出终止信号- onComplete
- error(Throwable)会发出终止信号- onError
使用 create 工厂方法就可以得到一个正常的 TestPublisher。而使用 createNonCompliant 工厂方法可以创建一个「不正常」的 TestPublisher。后者需要传入由 TestPublisher.Violation 枚举指定的一组选项,这些选项可用于告诉 publisher 忽略哪些问题。枚举值有:
- REQUEST_OVERFLOW: 允许- next在请求不足的时候也可以调用,而不会触发- IllegalStateException
- ALLOW_NULL: 允许- next能够发出一个- null值而不会触发- NullPointerException
- CLEANUP_ON_TERMINATE: 可以重复多次发出终止信号,包括- complete()、- error()和- emit()
最后,TestPublisher 还可以用不同的 assert* 来跟踪其内部的订阅状态。
使用转换方法 flux() 和 mono() 可以将其作为 Flux 和 Mono 来使用。
# 用 PublisherProbe 检查执行路径
 当构建复杂的操作链时,可能会有多个子序列,从而导致多个执行路径。
多数时候,这些子序列会生成一个足够明确的 onNext 信号,你可以通过检查最终结果 来判断它是否执行。
考虑下边这个方法,它构建了一条操作链,并使用 switchIfEmpty 方法在源为空的情况下, 替换成另一个源。
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}
2
3
4
5
很容易就可以测试出 switchIfEmpty 的哪一个逻辑分支被使用了,如下:
@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}
@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
但是如果例子中的方法返回的是一个 Mono<Void> 呢?它等待源发送结束,执行一个额外的任务, 然后就结束了。如果源是空的,则执行另一个备用的类似于 Runnable 的任务,如下:
private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}
public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) // 1
            .switchIfEmpty(doWhenEmpty); // 2
}
2
3
4
5
6
7
8
9
- then()方法会忽略 command,它只关心是否结束
- 两个都是空序列,这个时候如何区分(哪边执行了)呢?
为了验证执行路径是经过了 doWhenEmpty 的,你需要编写额外的代码,比如你需要一个这样的 Mono<Void>:
- 能够捕获到它被订阅的事实
- 以上事实需要在整个执行结束 之后 再进行验证
在 3.1 版本以前,你需要为每一种状态维护一个 AtomicBoolean 变量,然后在相应的 doOn* 回调中观察它的值。这需要添加不少的额外代码。好在,版本 3.1.0 之后可以使用 PublisherProbe 来做, 如下:
@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); // 1
    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) // 2
                .verifyComplete();
    probe.assertWasSubscribed(); // 3
    probe.assertWasRequested(); // 4
    probe.assertWasNotCancelled(); // 5
}
2
3
4
5
6
7
8
9
10
11
- 创建一个探针(probe),它会转化为一个空序列
- 在需要使用 Mono<Void>的位置调用probe.mono()来替换为探针
- 序列结束之后,你可以用这个探针来判断序列是如何使用的,你可以检查是它从哪(条路径)被订阅的
- 对于请求也是一样的
- 以及是否被取消了
你也可以在使用 Flux<T> 的位置通过调用 .flux() 方法来放置探针。如果你既需要用探针检查执行路径 还需要它能够发出数据,你可以用 PublisherProbe.of(Publisher) 方法包装一个 Publisher<T> 来搞定
