Reactor - 操作符总结
本内容里,如果一个操作符是专属于
Flux或Mono的,那么会给它注明前缀。 公共的操作符没有前缀。如果一个具体的用例涉及多个操作符的组合,这里以方法调用的方式展现, 会以一个点(.)开头,并将参数置于圆括号内,比如:.methodCall(parameter)。
# 创建一个新序列,它
- 发出一个
T,我已经有了:just- 基于一个
Optional<T>:Mono#justOrEmpty(Optional<T>) - 基于一个可能为
null的 T:Mono#justOrEmpty(T)
- 基于一个
- 发出一个
T,且还是由just方法返回- 但是「懒」创建的:使用
Mono#fromSupplier或用defer包装just
- 但是「懒」创建的:使用
- 发出许多
T,这些元素我可以明确列举出来:Flux#just(T...) - 基于迭代数据结构:
- 一个数组:
Flux#fromArray - 一个集合或 iterable:
Flux#fromIterable - 一个 Integer 的 range:
Flux#range - 一个
Stream提供给每一个订阅:Flux#fromStream(Supplier<Stream>)
- 一个数组:
- 基于一个参数值给出的源:
- 一个
Supplier<T>:Mono#fromSupplier - 一个任务:
Mono#fromCallable,Mono#fromRunnable - 一个
CompletableFuture<T>:Mono#fromFuture
- 一个
- 直接完成:
empty - 立即生成错误:
error- 但是「懒」的方式生成
Throwable:error(Supplier<Throwable>)
- 但是「懒」的方式生成
- 什么都不做:
never - 订阅时才决定:
defer - 依赖一个可回收的资源:
using - 可编程地生成事件(可以使用状态):
- 同步且逐个的:
Flux#generate - 异步(也可同步)的,每次尽可能多发出元素:
Flux#create(Mono#create也是异步的,只不过只能发一个)
- 同步且逐个的:
# 对序列进行转化
- 我想转化一个序列:
- 1对1地转化(比如字符串转化为它的长度):
map- 类型转化:
cast - 为了获得每个元素的序号:
Flux#index
- 类型转化:
- 1对n地转化(如字符串转化为一串字符):
flatMap+ 使用一个工厂方法 - 1对n地转化可自定义转化方法和/或状态:
handle - 对每一个元素执行一个异步操作(如对 url 执行 http 请求):
flatMap+ 一个异步的返回类型为Publisher的方法- 忽略一些数据:在 flatMap lambda 中根据条件返回一个
Mono.empty() - 保留原来的序列顺序:
Flux#flatMapSequential(对每个元素的异步任务会立即执行,但会将结果按照原序列顺序排序) - 当 Mono 元素的异步任务会返回多个元素的序列时:
Mono#flatMapMany
- 忽略一些数据:在 flatMap lambda 中根据条件返回一个
- 1对1地转化(比如字符串转化为它的长度):
- 我想添加一些数据元素到一个现有的序列:
- 在开头添加:
Flux#startWith(T...) - 在最后添加:
Flux#concatWith(T...)
- 在开头添加:
- 我想将
Flux转化为集合(一下都是针对Flux的)- 转化为 List:
collectList,collectSortedList - 转化为 Map:
collectMap,collectMultiMap - 转化为自定义集合:
collect - 计数:
count - reduce 算法(将上个元素的reduce结果与当前元素值作为输入执行reduce方法,如sum)
reduce- 将每次 reduce 的结果立即发出:
scan
- 将每次 reduce 的结果立即发出:
- 转化为一个 boolean 值:
- 对所有元素判断都为true:
all - 对至少一个元素判断为true:
any - 判断序列是否有元素(不为空):
hasElements - 判断序列中是否有匹配的元素:
hasElement
- 对所有元素判断都为true:
- 转化为 List:
- 我想合并 publishers
- 按序连接:
Flux#concat或.concatWith(other)- 即使有错误,也会等所有的 publishers 连接完成:
Flux#concatDelayError - 按订阅顺序连接(这里的合并仍然可以理解成序列的连接):
Flux#mergeSequential
- 即使有错误,也会等所有的 publishers 连接完成:
- 按元素发出的顺序合并(无论哪个序列的,元素先到先合并):
Flux#merge/.mergeWith(other)- 元素类型会发生变化:
Flux#zip/Flux#zipWith
- 元素类型会发生变化:
- 将元素组合:
- 2个 Monos 组成1个
Tuple2:Mono#zipWith - n个 Monos 的元素都发出来后组成一个 Tuple:
Mono#zip
- 2个 Monos 组成1个
- 在终止信号出现时「采取行动」:
- 在 Mono 终止时转换为一个
Mono<Void>:Mono#and - 当 n 个 Mono 都终止时返回
Mono<Void>:Mono#when - 返回一个存放组合数据的类型,对于被合并的多个序列:
- 每个序列都发出一个元素时:
Flux#zip - 任何一个序列发出元素时:
Flux#combineLatest
- 每个序列都发出一个元素时:
- 在 Mono 终止时转换为一个
- 只取各个序列的第一个元素:
Flux#first,Mono#first,mono.or (otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux) - 由一个序列触发(类似于
flatMap,不过「喜新厌旧」):switchMap - 由每个新序列开始时触发(也是「喜新厌旧」风格):
switchOnNext
- 按序连接:
- 我想重复一个序列:
repeat- 但是以一定的间隔重复:
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
- 但是以一定的间隔重复:
- 我有一个空序列,但是
- 我想要一个缺省值来代替:
defaultIfEmpty - 我想要一个缺省的序列来代替:
switchIfEmpty
- 我想要一个缺省值来代替:
- 我有一个序列,但是我对序列的元素值不感兴趣:
ignoreElements- 并且我希望用
Mono来表示序列已经结束:then - 并且我想在序列结束后等待另一个任务完成:
thenEmpty - 并且我想在序列结束之后返回一个
Mono:Mono#then(mono) - 并且我想在序列结束之后返回一个值:
Mono#thenReturn(T) - 并且我想在序列结束之后返回一个
Flux:thenMany
- 并且我希望用
- 我有一个 Mono 但我想延迟完成
- 当有1个或N个其他 publishers 都发出(或结束)时才完成:
Mono#delayUntilOther- 使用一个函数式来定义如何获取「其他 publisher」:
Mono#delayUntil(Function)
- 使用一个函数式来定义如何获取「其他 publisher」:
- 当有1个或N个其他 publishers 都发出(或结束)时才完成:
- 我想基于一个递归的生成序列的规则扩展每一个元素,然后合并为一个序列发出:
- 广度优先:
expand(Function) - 深度优先:
expandDeep(Function)
- 广度优先:
# 窥视(只读)序列
- 再不对序列造成改变的情况下,我想:
- 得到通知或执行一些操作:
- 发出元素:
doOnNext - 序列完成:
Flux#doOnComplete,Mono#doOnSuccess - 因错误终止:
doOnError - 取消:
doOnCancel - 订阅时:
doOnSubscribe - 请求时:
doOnRequest - 完成或错误终止:
doOnTerminate(Mono的方法可能包含有结果)- 但是在终止信号向下游传递 之后 :
doAfterTerminate
- 但是在终止信号向下游传递 之后 :
- 所有类型的信号(
Signal):Flux#doOnEach - 所有结束的情况(完成complete、错误error、取消cancel):
doFinally
- 发出元素:
- 记录日志:
log
- 得到通知或执行一些操作:
- 我想知道所有的事件:
- 每一个事件都体现为一个
single对象:- 执行 callback:
doOnEach - 每个元素转化为
single对象:materialize- 在转化回元素:
dematerialize
- 在转化回元素:
- 执行 callback:
- 转化为一行日志:
log
- 每一个事件都体现为一个
# 过滤序列
- 我想过滤一个序列
- 基于给定的判断条件:
filter- 异步地进行判断:
filterWhen
- 异步地进行判断:
- 仅限于指定类型的对象:
ofType - 忽略所有元素:
ignoreElements - 去重:
- 对于整个序列:
Flux#distinct - 去掉连续重复的元素:
Flux#distinctUntilChanged
- 对于整个序列:
- 基于给定的判断条件:
- 我只想要一部分序列:
- 只要 N 个元素:
- 从序列的第一个元素开始算:
Flux#take(long)- 取一段时间内发出的元素:
Flux#take(Duration) - 只取第一个元素放到
Mono中返回:Flux#next() - 使用
request(N)而不是取消:Flux#limitRequest(long)
- 取一段时间内发出的元素:
- 从序列的最后一个元素倒数:
Flux#takeLast - 直到满足某个条件(包含):
Flux#takeUntil(基于判断条件),Flux#takeUntilOther(基于对 publisher 的比较) - 直到满足某个条件(不包含):
Flux#takeWhile
- 从序列的第一个元素开始算:
- 最多只取 1 个元素:
- 给定序号:
Flux#elementAt - 最后一个:
.takeLast(1)- 如果为序列空则发出错误信号:
Flux#last() - 如果序列为空则返回默认值:
Flux#last(T)
- 如果为序列空则发出错误信号:
- 给定序号:
- 跳过一些元素:
- 从序列的第一个元素开始跳过:
Flux#skip(long)- 跳过一段时间内发出的元素:
Flux#skip(Duration)
- 跳过一段时间内发出的元素:
- 跳过最后的 n 个元素:
Flux#skipLast - 直到满足某个条件(包含):
Flux#skipUntil(基于判断条件),Flux#skipUntilOther(基于对 publisher 的比较) - 直到满足某个条件(不包含):
Flux#skipWhile
- 从序列的第一个元素开始跳过:
- 采样:
- 给定采样周期:
Flux#sample(Duration)- 取采样周期里的第一个元素而不是最后一个:
sampleFirst
- 取采样周期里的第一个元素而不是最后一个:
- 基于另一个 publisher:
Flux#sample(Publisher) - 基于 publisher「超时」:
Flux#sampleTimeout(每一个元素会触发一个 publisher,如果这个 publisher 不被下一个元素触发的 publisher 覆盖就发出这个元素)
- 给定采样周期:
- 只要 N 个元素:
- 我只想要一个元素(如果多于一个就返回错误)
- 如果序列为空,发出错误信号:
Flux#single() - 如果序列为空,发出一个缺省值:
Flux#single(T) - 如果序列为空就返回一个空序列:
Flux#singleOrEmpty
- 如果序列为空,发出错误信号:
# 错误处理
- 我想创建一个错误序列:
error- 替换一个完成的
Flux:.concat(Flux.error(e)) - 替换一个完成的
Mono:.then(Mono.error(e)) - 如果元素超时未发出:
timeout - 「懒」创建:
error(Supplier<Throwable>)
- 替换一个完成的
- 我想要类似 try/catch 的表达方式:
- 抛出异常:
error - 捕获异常:
- 然后返回缺省值:
onErrorReturn - 然后返回一个
Flux或Mono:onErrorResume - 包装异常后再抛出:
.onErrorMap(t -> new RuntimeException(t))
- 然后返回缺省值:
- finally 代码块:
doFinally - Java 7 之后的 try-with-resources 写法:
using工厂方法
- 抛出异常:
- 我想从错误中恢复
- 返回一个缺省的:
- 的值:
onErrorReturn Publisher:Flux#onErrorResume和Mono#onErrorResume
- 的值:
- 重试:
retry- 由一个用于伴随 Flux 触发:
retryWhen
- 由一个用于伴随 Flux 触发:
- 返回一个缺省的:
- 我想处理回压错误(向上游发出「MAX」的 request,如果下游的 request 比较少,则应用策略)
- 抛出
IllegalStateException:Flux#onBackpressureError - 丢弃策略:
Flux#onBackpressureDrop- 但是不丢弃最后一个元素:
Flux#onBackpressureLatest
- 但是不丢弃最后一个元素:
- 缓存策略(有限或无限):
Flux#onBackpressureBuffer- 当有限的缓存空间用满则应用给定策略:
Flux#onBackpressureBuffer带有策略BufferOverflowStrategy
- 当有限的缓存空间用满则应用给定策略:
- 抛出
# 基于时间的操作
- 我想将元素转换为带有时间信息的
Tuple2<Long, T>- 从订阅时开始:
elapsed - 记录时间戳:
timestamp
- 从订阅时开始:
- 如果元素间延迟过长则中止序列:
timeout - 以固定的周期发出元素:
Flux#interval - 在一个给定的延迟后发出
0:staticMono.delay. - 我想引入延迟:
- 对每一个元素:
Mono#delayElement,Flux#delayElements - 延迟订阅:
delaySubscription
- 对每一个元素:
# 拆分 Flux
- 我想将一个
Flux<T>拆分为一个Flux<Flux<T>>:- 以个数为界:
window(int)- 会出现重叠或丢弃的情况:
window(int, int)
- 会出现重叠或丢弃的情况:
- 以时间为界:
window(Duration)- 会出现重叠或丢弃的情况:
window(Duration, Duration)
- 会出现重叠或丢弃的情况:
- 以个数或时间为界:
windowTimeout(int, Duration) - 基于对元素的判断条件:
windowUntil- 触发判断条件的元素会分到下一波(
cutBefore变量):.windowUntil(predicate, true) - 满足条件的元素在一波,直到不满足条件的元素发出开始下一波:
windowWhile(不满足条件的元素会被丢弃)
- 触发判断条件的元素会分到下一波(
- 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:
window(Publisher),windowWhen
- 以个数为界:
- 我想将一个
Flux<T>的元素拆分到集合- 拆分为一个一个的
List:- 以个数为界:
buffer(int)- 会出现重叠或丢弃的情况:
buffer(int, int)
- 会出现重叠或丢弃的情况:
- 以时间为界:
buffer(Duration)- 会出现重叠或丢弃的情况:
buffer(Duration, Duration)
- 会出现重叠或丢弃的情况:
- 以个数或时间为界:
bufferTimeout(int, Duration) - 基于对元素的判断条件:
bufferUntil(Predicate)- 触发判断条件的元素会分到下一个buffer:
.bufferUntil(predicate, true) - 满足条件的元素在一个buffer,直到不满足条件的元素发出开始下一buffer:
bufferWhile(Predicate)
- 触发判断条件的元素会分到下一个buffer:
- 通过另一个 Publisher 的每一个 onNext 信号来拆分序列:
buffer(Publisher),bufferWhen
- 以个数为界:
- 拆分到指定类型的 "collection":
buffer(int, Supplier<C>)
- 拆分为一个一个的
- 我想将
Flux<T>中具有共同特征的元素分组到子 Flux:groupBy(Function<T,K>)TIP:注意返回值是Flux<GroupedFlux<K, T>>,每一个GroupedFlux具有相同的 key 值K,可以通过key()方法获取。
# 回到同步的世界
- 我有一个
Flux<T>,我想:- 在拿到第一个元素前阻塞:
Flux#blockFirst- 并给出超时时限:
Flux#blockFirst(Duration)
- 并给出超时时限:
- 在拿到最后一个元素前阻塞(如果序列为空则返回 null):
Flux#blockLast- 并给出超时时限:
Flux#blockLast(Duration)
- 并给出超时时限:
- 同步地转换为
Iterable<T>:Flux#toIterable - 同步地转换为 Java 8
Stream<T>:Flux#toStream
- 在拿到第一个元素前阻塞:
- 我有一个
Mono<T>,我想:- 在拿到元素前阻塞:
Mono#block- 并给出超时时限:
Mono#block(Duration)
- 并给出超时时限:
- 转换为
CompletableFuture<T>:Mono#toFuture
- 在拿到元素前阻塞:
编辑此页 (opens new window)
更新时间: 2024/11/02, 09:43:06