Reactor - 单元测试
# 测试
无论你是编写了一个简单的 Reactor 操作链,还是开发了自定义的操作符,对它进行 自动化的测试总是一个好主意。
Reactor 内置一些专门用于测试的元素,放在一个专门的 artifact 里: reactor-test
。 你可以在 on Github (opens new window) 的 reactor-core 库里找到这个项目。
如果要用它来进行测试,添加 scope 为 test 的依赖。
reactor-test 用 Maven 配置 <dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
// 1
</dependency>
2
3
4
5
6
- 如果你使用了 BOM,你不需要指定
<version>
。
reactor-test 用 Gradle 配置 dependencies
dependencies {
testCompile 'io.projectreactor:reactor-test'
}
2
3
reactor-test
的两个主要用途:
- 使用
StepVerifier
一步一步地测试一个给定场景的序列 - 使用
TestPublisher
生成数据来测试下游的操作符(包括你自己的operator)
# 使用 StepVerifier
来测试
最常见的测试 Reactor 序列的场景就是定义一个 Flux
或 Mono
,然后在订阅它的时候测试它的行为。
当你的测试关注于每一次的事件的时候,就非常容易转化为使用 StepVerifier
的测试场景: 下一个期望的事件是什么?你是否期望使用 Flux
来发出一个特别的值?或者接下来 300ms 什么都不做?所有这些都可以使用 StepVerifier
API 来表示。
例如,你可能会使用如下的工具方法来包装一个 Flux
:
public <T> Flux<T> appendBoomError(Flux<T> source) {
return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}
2
3
要测试它的话,你需要校验如下内容:
我希望这个
Flux
先发出foo
,然后发出bar
,然后 生成一个内容为boom
的错误。 最后订阅并校验它们。
使用 StepVerifier
API 来表示以上的验证过程:
@Test
public void testAppendBoomError() {
Flux<String> source = Flux.just("foo", "bar"); // 1
StepVerifier.create( // 2
appendBoomError(source)) // 3
.expectNext("foo") // 4
.expectNext("bar")
.expectErrorMessage("boom") // 5
.verify(); // 6
}
2
3
4
5
6
7
8
9
10
11
- 由于被测试方法需要一个
Flux
,定义一个简单的Flux
用于测试 - 创建一个
StepVerifier
构造器来包装和校验一个Flux
- 传进来需要测试的
Flux
(即待测方法的返回结果) - 第一个我们期望的信号是
onNext
,它的值为foo
- 最后我们期望的是一个终止信号
onError
,异常内容应该为boom
- 不要忘了使用
verify()
触发测试
API 是一个构造器,通过传入一个要测试的序列来创建一个 StepVerifier
。从而你可以:
- 表示你 期望 发生的下一个信号。如果收到其他信号(或者信号与期望不匹配),整个测试就会 失败(
AssertionError
)。例如你可能会用到expectNext(T...)
或expectNextCount(long)
- 消费 下一个信号。当你想要跳过部分序列或者当你想对信号内容进行自定义的
assertion
的时候会用到它(比如要校验是否有一个onNext
信号,并校验对应发出的元素是否是一个 size 为 5 的 List)。你可能会用到consumeNextWith(Consumer<T>)
- 更多样的操作 比如暂停或运行一段代码。比如,你想对测试状态或内容进行调整或处理, 你可能会用到
thenAwait(Duration)
和then(Runnable)
对于终止事件,相应的期望方法(expectComplete()
、expectError()
,及其所有的变体方法) 使用之后就不能再继续增加别的期望方法了。最后你只能对 StepVerifier
进行一些额外的配置并 触发校验(通常调用 verify()
及其变体方法)。
从 StepVerifier 内部来看,它订阅了待测试的 Flux
或 Mono
,然后将序列中的每个信号与测试 场景的期望进行比对。如果匹配的话,测试成功。如果有不匹配的情况,则抛出 AssertionError
异常。
请记住是
verify()
触发了校验过程。这个 API 还有一些结合了verify()
与期望的终止信号 的方法:verifyComplete()
、verifyError()
、verifyErrorMessage(String)
等。
注意,如果有一个传入 lambda 的期望方法抛出了 AssertionError
,会被报告为测试失败。 这可用于自定义 assertion。
默认情况下,
verify()
方法(及同源的verifyThenAssertThat
、verifyComplete()
等) 没有超时的概念。它可能会永远阻塞住。你可以使用StepVerifier.setDefaultTimeout(Duration)
来设置一个全局的超时时间,或使用verify(Duration)
指定。
# 操控时间
StepVerifier
可以用来测试基于时间的操作符,从而避免测试的长时间运行。可以使用构造器 StepVerifier.withVirtualTime
达到这一点。
示例如下:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... 继续追加期望方法
2
虚拟时间(virtual time) 的功能会在 Reactor 的调度器(Scheduler
)工厂方法中插入一个自定义的 调度器。这些基于时间的操作符通常默认使用 Schedulers.parallel()
调度器。(虚拟时间的) 技巧在于使用一个 VirtualTimeScheduler
来代替默认调度器。然而一个重要的前提就是,只有在初始化 虚拟时间调度器之后的操作符才会起作用。
为了提高 StepVerifier
正常起作用的概率,它一般不接收一个简单的 Flux
作为输入,而是接收 一个 Supplier
,从而可以在配置好订阅者 之后 「懒创建」待测试的 flux。
要注意的是,
Supplier<Publisher<T>>
可用于「懒创建」,否则不能保证虚拟时间 能完全起作用。尤其要避免提前实例化Flux
,要在Supplier
中用 lambda 创建并返回Flux
变量。
有两种处理时间的期望方法,无论是否配置虚拟时间都是可用的:
thenAwait(Duration)
暂停校验步骤(允许信号延迟发出)expectNoEvent(Duration)
同样让序列持续一定的时间,期间如果有 任何 信号发出则测试失败
两个方法都会基于给定的持续时间暂停线程的执行,如果是在虚拟时间模式下就相应地使用虚拟时间。
expectNoEvent
将订阅(subscription
)也认作一个事件。假设你用它作为第一步,如果检测 到有订阅信号,也会失败。这时候可以使用expectSubscription().expectNoEvent(duration)
来代替。
为了快速校验前边提到的 Mono.delay
,我们可以这样完成代码:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
.expectSubscription()
.expectNoEvent(Duration.ofDays(1)) // 2
.expectNext(0) // 3
.verifyComplete(); // 4
2
3
4
5
- 期待一天内没有信号发生
- 然后期待一个 next 信号为
0
- 然后期待完成(同时触发校验)
我们也可以使用 thenAwait(Duration.ofDays(1))
,但是 expectNoEvent
的好处是 能够验证在此之前不会发生什么。
注意 verify()
返回一个 Duration
,这是整个测试的 真实时间。
虚拟时间并非银弹。请记住 所有的 调度器都会被替换为
VirtualTimeScheduler
。 有些时候你可以锁定校验过程,因为虚拟时钟在遇到第一个期望校验之前并不会开始,所以对于 「无数据「的期望校验也必须能够运行在虚拟时间模式下。在无限序列中,虚拟时间模式的发挥 空间也很有限,因为它可能导致线程(序列的发出和校验的运行都在这个线程上)卡住。
# 使用 StepVerifier
进行后校验
当配置完你测试场景的最后的期望方法后,你可以使用 verifyThenAssertThat()
来代替 verify()
触发执行后的校验。
verifyThenAssertThat()
返回一个 StepVerifier.Assertions
对象,你可以用它来校验 整个测试场景成功刚结束后的一些状态(它也会调用 verify()
)。典型应用就是校验有多少 元素被操作符丢弃(参考 使用全局的 Hooks)。
# 测试 Context
更多关于 Context
的内容请参考 增加一个 Context 到响应式序列。
StepVerifier
有一些期望方法可以用来测试 Context
:
expectAccessibleContext
: 返回一个ContextExpectations
对象,你可以用它来在Context
上配置期望校验。一定记住要调用then()
来返回到对序列的期望校验上来expectNoAccessibleContext
: 是对「没有Context
」的校验。通常用于 被测试的Publisher
并不是一个响应式的,或没有任何操作符能够传递Context
(比如一个generate
的Publisher
)
此外,还可以用 StepVerifierOptions
方法传入一个测试用的初始 Context
给 StepVerifier
, 从而可以创建一个校验(verifier)。
这些特性通过下边的代码演示:
StepVerifier.create(Mono.just(1).map(i -> i + 10),
StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) // 1
.expectAccessibleContext() // 2
.contains("foo", "bar") // 3
.then() // 4
.expectNext(11)
.verifyComplete(); // 5
2
3
4
5
6
7
- 使用
StepVerifierOptions
创建StepVerifier
并传入初始Context
- 开始对
Context
进行校验,这里只是确保Context
正常传播了 - 对
Context
进行校验的例子:比如验证是否包含一个 "foo" - "bar" 键值对 - 使用
then()
切换回对序列的校验 - 不要忘了用
verify()
触发整个校验过程
# 用 TestPublisher
手动发出元素
对于更多高级的测试,如果能够完全掌控源发出的数据就会方便很多,因为这样就可以在测试的 时候更加有的放矢地发出想测的数据。
另一种情况就是你实现了自己的操作符,然后想校验它的行为——尤其是在源不稳定的时候——是否符合响应式流规范。
reactor-test
提供了 TestPublisher
类来应对这两种需求。这个类本质上是一个 Publisher<T>
, 你可以通过可编程的方式来用它发出各种信号:
next(T)
以及next(T, T...)
发出 1-n 个onNext
信号emit(T...)
起同样作用,并且会执行complete()
complete()
会发出终止信号onComplete
error(Throwable)
会发出终止信号onError
使用 create
工厂方法就可以得到一个正常的 TestPublisher
。而使用 createNonCompliant
工厂方法可以创建一个「不正常」的 TestPublisher
。后者需要传入由 TestPublisher.Violation
枚举指定的一组选项,这些选项可用于告诉 publisher 忽略哪些问题。枚举值有:
REQUEST_OVERFLOW
: 允许next
在请求不足的时候也可以调用,而不会触发IllegalStateException
ALLOW_NULL
: 允许next
能够发出一个null
值而不会触发NullPointerException
CLEANUP_ON_TERMINATE
: 可以重复多次发出终止信号,包括complete()
、error()
和emit()
最后,TestPublisher
还可以用不同的 assert*
来跟踪其内部的订阅状态。
使用转换方法 flux()
和 mono()
可以将其作为 Flux
和 Mono
来使用。
# 用 PublisherProbe
检查执行路径
当构建复杂的操作链时,可能会有多个子序列,从而导致多个执行路径。
多数时候,这些子序列会生成一个足够明确的 onNext
信号,你可以通过检查最终结果 来判断它是否执行。
考虑下边这个方法,它构建了一条操作链,并使用 switchIfEmpty
方法在源为空的情况下, 替换成另一个源。
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
return source
.flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
.switchIfEmpty(fallback);
}
2
3
4
5
很容易就可以测试出 switchIfEmpty 的哪一个逻辑分支被使用了,如下:
@Test
public void testSplitPathIsUsed() {
StepVerifier.create(processOrFallback(Mono.just("just a phrase with tabs!"),
Mono.just("EMPTY_PHRASE")))
.expectNext("just", "a", "phrase", "with", "tabs!")
.verifyComplete();
}
@Test
public void testEmptyPathIsUsed() {
StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
.expectNext("EMPTY_PHRASE")
.verifyComplete();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
但是如果例子中的方法返回的是一个 Mono<Void>
呢?它等待源发送结束,执行一个额外的任务, 然后就结束了。如果源是空的,则执行另一个备用的类似于 Runnable 的任务,如下:
private Mono<String> executeCommand(String command) {
return Mono.just(command + " DONE");
}
public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
return commandSource
.flatMap(command -> executeCommand(command).then()) // 1
.switchIfEmpty(doWhenEmpty); // 2
}
2
3
4
5
6
7
8
9
then()
方法会忽略 command,它只关心是否结束- 两个都是空序列,这个时候如何区分(哪边执行了)呢?
为了验证执行路径是经过了 doWhenEmpty
的,你需要编写额外的代码,比如你需要一个这样的 Mono<Void>
:
- 能够捕获到它被订阅的事实
- 以上事实需要在整个执行结束 之后 再进行验证
在 3.1 版本以前,你需要为每一种状态维护一个 AtomicBoolean
变量,然后在相应的 doOn*
回调中观察它的值。这需要添加不少的额外代码。好在,版本 3.1.0 之后可以使用 PublisherProbe
来做, 如下:
@Test
public void testCommandEmptyPathIsUsed() {
PublisherProbe<Void> probe = PublisherProbe.empty(); // 1
StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) // 2
.verifyComplete();
probe.assertWasSubscribed(); // 3
probe.assertWasRequested(); // 4
probe.assertWasNotCancelled(); // 5
}
2
3
4
5
6
7
8
9
10
11
- 创建一个探针(probe),它会转化为一个空序列
- 在需要使用
Mono<Void>
的位置调用probe.mono()
来替换为探针 - 序列结束之后,你可以用这个探针来判断序列是如何使用的,你可以检查是它从哪(条路径)被订阅的
- 对于请求也是一样的
- 以及是否被取消了
你也可以在使用 Flux<T>
的位置通过调用 .flux()
方法来放置探针。如果你既需要用探针检查执行路径 还需要它能够发出数据,你可以用 PublisherProbe.of(Publisher)
方法包装一个 Publisher<T>
来搞定