Reactor - 调试 Reactor
# 调试 Reactor
从命令式和同步式编程切换到响应式和异步式编程有时候是令人生畏的。 学习曲线中最陡峭的异步就是出错时如何分析和调试。
在命令式世界,调试通常都是非常直观的:直接看 stack trace 就可以找到问题出现的位置, 以及:是否问题责任全部出在你自己的代码?问题是不是发生在某些库代码?如果是, 那你的哪部分代码调用了库,是不是传参不合适导致的问题?
# 典型的 Reactor Stack Trace
当你切换到异步代码,事情就变得复杂的多了。
看一下下边的 stack trace:
一段典型的 Reactor stack trace
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.emitScalar(FluxFlatMap.java:380)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:349)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:119)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:144)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:99)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:316)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:98)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668)
at reactor.core.publisher.Mono.subscribe(Mono.java:2629)
at reactor.core.publisher.Mono.subscribe(Mono.java:2604)
at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:722)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
这里边有好多信息,我们得到了一个 IndexOutOfBoundsException
,内容是 "源发出了 不止一个元素"。
我们也许可以很快假定这个源是一个 Flux/Mono,并通过下一行提到的 MonoSingle
确定是 Mono。 看上去是来自一个 single
操作符的抱怨。
查看 Javadoc 中关于操作符 Mono#single
的说明,我们看到 single
有一个规定: 源必须只能发出一个元素。看来是有一个源发出了多于一个元素,从而违反了这一规定。
我们可以更进一步找出那个源吗?下边的这些内容帮不上什么忙,只是打印了一些内部的似乎是一个响应式链的信息, 主要是一些 subscribe
和 request
的调用。
粗略过一下这些行,我们至少可以勾画出一个大致的出问题的链:大概涉及一个 MonoSingle
、一个 FluxFlatMap
,以及一个 FluxRange
(每一个都对应 trace 中的几行,但总体涉及这三个类)。 所以难道是 range().flatMap().single()
这样的链?
但是如果在我们的应用中多处都用到这一模式,那怎么办?通过这些还是不能确定什么, 搜索 single
也找不到问题所在。最后一行指向了我们的代码。我们似乎终于接近真相了。
不过,等等… 当我们找到源码文件,我们只能找到一个已存在的 Flux
被订阅了,如下:
toDebug.subscribe(System.out::println, Throwable::printStackTrace);
所有这些都发生在订阅时,但是 Flux
本身没有在这里 声明 。更糟的是, 当我们找到变量声明的地方,我们看到:
public Mono<String> toDebug; //请忽略 public 的属性
变量声明的地方并没有 实例化 。我们必须做最坏的打算,那就是在这个应用中, 可能在几个不同的代码路径上对这个变量赋了值,但我们不确定是哪一个导致了问题。
这是一种 Reactor 运行时错误,而不是编译错误。
我们希望找到的是操作符在哪里添加到操作链上的 —— 也就是 Flux
在哪里 声明的。我们通常说这个 Flux
是被 组装(assembly) 的。
# 开启调试模式
即便 stack trace 能够对有些许经验的开发者传递一些信息,但是在一些复杂的情况下, 这并不是一种理想的方式。
幸运的是,Reactor 内置了一种面向调试的能力—— 操作期测量(assembly-time instrumentation)。
这通过 在应用启动的时候 (或至少在有问题的 Flux
或 Mono
实例化之前) 加入自定义的 Hook.onOperator
钩子(hook),如下:
Hooks.onOperatorDebug();
这行代码——通过包装操作符的构造方法,并在此捕捉 stack trace——来监测对这个 Flux
(或 Mono
)的操作符的调用(也就是「组装」链的地方)。由于这些在 操作链被声明的地方就搞定,这个 hook 应该在 早于 声明的时候被激活, 最保险的方式就是在你程序的最开始就激活它。
之后,如果发生了异常,导致失败的操作符能够找到捕捉点并补充 stack trace。
在下一小节,我们看一下 stack trace 会有什么不同,以及如何对其进行分析。
# 阅读调试模式的 Stack Trace
我们在对上边的例子激活 operatorStacktrace
调试功能后,stack trace 如下:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:314) // 1
...
// 2
...
at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668)
at reactor.core.publisher.Mono.subscribe(Mono.java:2629)
at reactor.core.publisher.Mono.subscribe(Mono.java:2604)
at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:727)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: // 3
Assembly trace from producer [reactor.core.publisher.MonoSingle] : // 4
reactor.core.publisher.Flux.single(Flux.java:5335)
reactor.guide.GuideTests.scatterAndGather(GuideTests.java:689)
reactor.guide.GuideTests.populateDebug(GuideTests.java:702)
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): // 5
|_ Flux.single(TestWatcher.java:55) // 6
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- 这一条是新的:可以发现外层操作符捕捉到了 stack trace
- 第一部分的 stack trace 多数与上边一样,显示了操作符内部的信息(所以省略了这一块)
- 从这里开始,是在调试模式下显示的内容
- 首先我们获得了关于操作符组装的信息
- 以及错误沿着操作链传播的轨迹(从错误点到订阅点)
- 每一个看到这个错误的操作符都会列出,包括类和行信息。如果操作符是在 Reactor 源码内部组装的,行信息会被忽略
可见,捕获的 stack trace 作为 OnAssemblyException
添加到原始错误信息的之后。有两部分, 但是第一部分更加有意思。它显示了操作符触发异常的路径。这里显示的是 scatterAndGather
方法中的 single
导致的问题,而 scatterAndGather
方法是在 JUnit 中被 populateDebug
方法调用的。
既然我们已经有足够的信息来查出罪魁祸首,我们就来看一下 scatterAndGather
方法吧:
private Mono<String> scatterAndGather(Flux<String> urls) {
return urls.flatMap(url -> doRequest(url))
.single(); // 1
}
2
3
4
- 找到了,就是这个
single
现在我们可以发现错误的根源是将多个 HTTP 请求转化为 URLs 的 flatMap
方法后边接的是 single
, 这太严格了。使用 git blame
找到代码作者,并同他讨论过后,发现他是本来是想用不那么严格的 take(1)
方法的。
我们解决了问题。
错误被以下这些操作符观察(observed)了:
调试信息的第二部分在这个例子中意义不大,因为错误实际发生在最后一个操作符上(离 subscribe
最近的一个)。 另一个例子可能更加清楚:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
.transform(FakeUtils1.applyFilters)
.transform(FakeUtils2.enrichUser)
.blockLast();
2
3
4
现在想象一下在 findAllUserByName
内部有个 map
方法报错了。我们可能会看到如下的 trace:
Error has been observed by the following operator(s):
|_ Flux.map(FakeRepository.java:27)
|_ Flux.map(FakeRepository.java:28)
|_ Flux.filter(FakeUtils1.java:29)
|_ Flux.transform(GuideDebuggingExtraTests.java:41)
|_ Flux.elapsed(FakeUtils2.java:30)
|_ Flux.transform(GuideDebuggingExtraTests.java:42)
2
3
4
5
6
7
这与链上收到错误通知的操作符是一致:
- 异常源自第一个
map
- 被第二个
map
看到(都在findAllUserByName
方法中) - 接着被一个
filter
和一个transform
看到,说明链的这部分是由一个可重复使用的转换方法组装的 (这里是applyFilters
工具方法) - 最后被一个
elapsed
和一个transform
看到,类似的,elapsed
由第二个转换方法(enrichUser
) 组装
用这种形式的检测方式构造 stack trace 是成本较高的。也因此这种调试模式作为最终大招, 只应该在可控的方式下激活。
# 用 checkpoint()
方式替代
调试模式是全局性的,会影响到程序中每一个组装到一个 Flux
或 Mono
的操作符。好处在于可以进行 事后调试(after-the-fact debugging):无论错误是什么,我们都会得到足够的调试信息。
就像前边见到的那样,这种全局性的调试会因为成本较高而影响性能(其影响在于生成的 stack traces 数量)。 如果我们能大概定位到疑似出问题的操作符的话就可以不用花那么大的成本。然而,问题出现后, 我们通常无法定位到哪一个操作符可能存在问题,因为缺少一些 trace 信息,我们得修改代码, 打开调试模式,期望能够复现问题。
这种情况下,我们需要切换到调试模式,并进行一些必要的准备工作以便能够更好的发现复现的问题, 并捕捉到所有的信息。
如果你能确定是在你的代码中组装的响应式链存在问题,而且程序的可服务性又是很重要的, 那么你可以 使用 checkpoint()
操作符,它有两种调试技术可用。
你可以把这个操作符加到链中。这时 checkpoint
操作符就像是一个 hook,但只对它所在的链起作用。
还有一个 checkpoint(String)
的方法变体,你可以传入一个独特的字符串以方便在 assembly traceback 中识别信息。 这样会省略 stack trace,你可以依赖这个字符串(以下改称「定位描述符」)来定位到组装点。checkpoint(String)
比 checkpoint
有更低的执行成本。
checkpoint(String)
在它的输出中包含 "light" (可以方便用于搜索),如下所示:
...
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.FluxElapsed] is identified by light checkpoint [light checkpoint identifier].
2
3
最后的但同样重要的是,如果你既想通过 checkpoint 添加定位描述符,同时又依赖于 stack trace 来定位组装点,你可以使用 checkpoint("description", true)
来实现这一点。这时回溯信息又出来了, 同时附加了定位描述符,如下例所示:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : // 1
reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:174)
reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescription(FluxOnAssemblyTest.java:159)
Error has been observed by the following operator(s):
|_ ParallelFlux.checkpointnull
2
3
4
5
6
descriptionCorrelation1234
是通过checkpoint
给出的定位描述符。
定位描述符可以是静态的字符串、或人类可读的描述、或一个 correlation ID(例如, 来自 HTTP 请求头的信息)。
当全局调试模式和
checkpoint()
都开启的时候,checkpoint 的 stacks 输出会作为 suppressed 错误输出,按照声明顺序添加在操作符图(graph)的后面。
# 记录流的日志
除了基于 stack trace 的调试和分析,还有一个有效的工具可以跟踪异步序列并记录日志。
就是 log()
操作符。将其加到操作链上之后,它会读(只读,peek)每一个 在其上游的 Flux
或 Mono
事件(包括 onNext
、onError
、 onComplete
, 以及 订阅、 取消、和 请求)。
边注:关于 logging 的具体实现
log
操作符通过SLF4J
使用类似 Log4J 和 Logback 这样的公共的日志工具, 如果 SLF4J 不存在的话,则直接将日志输出到控制台。控制台使用
System.err
记录WARN
和ERROR
级别的日志,使用System.out
记录其他级别的日志。如果你喜欢使用 JDK
java.util.logging
,在 3.0.x 你可以设置 JDK 的系统属性reactor.logging.fallback
。
假设我们配置并激活了 logback,以及一个形如 range(1,10).take(3)
的操作链。通过将 log()
放在 take 之前, 我们就可以看到它内部是如何运行的,以及什么样的事件会向上游传播给 range,如下所示:
Flux<Integer> flux = Flux.range(1, 10)
.log()
.take(3);
flux.subscribe();
2
3
4
输出如下(通过 logger 的 console appender):
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) // 1
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) // 2
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) // 3
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel() // 4
2
3
4
5
6
这里,除了 logger 自己的格式(时间、线程、级别、消息),log()
操作符 还输出了其他一些格式化的东西:
reactor.Flux.Range.1
是自动生成的日志 类别(category),以防你在操作链中多次使用 同一个操作符。通过它你可以分辨出来是哪个操作符的事件(这里是range
的)。 你可以调用log(String)
方法用自定义的类别替换这个标识符。在几个用于分隔的字符之后, 打印出了实际的事件。这里是一个onSubscribe
调用、一个request
调用、三个onNext
调用, 以及一个cancel
调用。对于第一行的onSubscribe
,我们知道了Subscriber
的具体实现, 通常与操作符指定的实现是一致的,在方括号内有一些额外信息,包括这个操作符是否能够 通过同步或异步融合的方式进行自动优化- 第二行,我们可以看到是一个由下游传播上来的个数无限的请求
- 然后 range 一下发出三个值
- 最后一行,我们看到了
cancel()
最后一行,(4),最有意思。我们看到 take
在这里发挥作用了。在它拿到足够的元素之后, 就将序列切断了。简单来说,take()
导致源在发出用户请求的数量后 cancel()
了。