Spring Boot - 响应式远程调用
# 前言
在 Spring 5 之前,如果我们想要调用其他系统提供的 HTTP 服务,通常可以使用 Spring 提供的 RestTemplate 来访问,不过由于 RestTemplate 是 Spring 3 中引入的同步阻塞式 HTTP 客户端,因此存在一定性能瓶颈。根据 Spring 官方文档介绍,在将来的版本中它可能会被弃用。
作为替代,Spring 官方已在 Spring 5 中引入了 WebClient 作为非阻塞式 Reactive HTTP 客户端。下面通过样例演示如何使用 WebClient。
需要 Spring Boot 3 以上才能使用 WebClient。
# 基本介绍
# 什么是 WebClient
从 Spring 5 开始,Spring 中全面引入了 Reactive 响应式编程。而 WebClient 则是 Spring WebFlux 模块提供的一个非阻塞的基于响应式编程的进行 Http 请求的客户端工具。
由于 WebClient 的请求模式属于异步非阻塞,能够以少量固定的线程处理高并发的 HTTP 请求。因此,从 Spring 5 开始,HTTP 服务之间的通信我们就可以考虑使用 WebClient 来取代之前的 RestTemplate。
# WebClient 的优势
与 RestTemplate 相比,WebClient 有如下优势:
- 非阻塞,Reactive 的,并支持更高的并发性和更少的硬件资源
- 提供利用 Java 8 lambdas 的函数 API
- 支持同步和异步方案
- 支持从服务器向上或向下流式传输
RestTemplate 不适合在非阻塞应用程序中使用,因此 Spring WebFlux 应用程序应始终使用 WebClient。在大多数高并发场景中,WebClient 也应该是 Spring MVC 中的首选,并且用于编写一系列远程,相互依赖的调用。
# 安装配置
编辑 pom.xml 文件,添加 Spring WebFlux 依赖,从而可以使用 WebClient。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2
3
4
# 创建 WebClient 实例
从 WebClient 的源码中可以看出,WebClient 接口提供了三个不同的静态方法来创建 WebClient 实例:
# 利用 create() 创建
下面利用 create()
方法创建一个 WebClient 对象,并利用该对象请求一个网络接口,最后将结果以字符串的形式打印出来。
注意:由于利用 create()
创建的 WebClient 对象没有设定 baseURL,所以这里的 uri()
方法相当于重写 baseURL。
public void test() {
WebClient webClient = WebClient.create();
Mono<String> mono = webClient
.get() // GET 请求
.uri("http://jsonplaceholder.typicode.com/posts/1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
System.out.println(mono.block());
}
2
3
4
5
6
7
8
9
10
11
12
# 利用 create(String baseUrl) 创建
下面利用 create(String baseUrl)
方法创建一个 WebClient 对象,并利用该对象请求一个网络接口,最后将结果以字符串的形式打印出来。
注意:由于利用 create(String baseUrl)
创建的 WebClient 对象时已经设定了 baseURL,所以 uri()
方法会将返回的结果和 baseUrl 进行拼接组成最终需要远程请求的资源 URL。
public void test() {
WebClient webClient = WebClient.create("http://jsonplaceholder.typicode.com");
Mono<String> mono = webClient
.get() // GET 请求
.uri("/posts/1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
System.out.println(mono.block());
}
2
3
4
5
6
7
8
9
10
11
# 利用 builder 创建(推荐)
下面使用 builder()
返回一个 WebClient.Builder
,然后再调用 build 就可以返回 WebClient 对象。并利用该对象请求一个网络接口,最后将结果以字符串的形式打印出来。
注意:由于返回的不是 WebClient 类型而是 WebClient.Builder
,我们可以通过返回的 WebClient.Builder
设置一些配置参数(例如:baseUrl、header、cookie 等),然后再调用 build 就可以返回 WebClient 对象了。
使用 WebClient.builder()
指定额外的配置:
- uriBuilderFactory: 用作定制 baseURL
- defaultUriVariables: 扩展URI模板时使用的默认值
- defaultHeader: 设置每个请求的默认 Header
- defaultCookie: 设置每个请求的默认 Cookie
- defaultRequest: 设置每个消费者自定义请求
- filter: 请求过滤器
- exchangeStrategies: HTTP 消息读取器/写入器自定义
- clientConnector: HTTP客户端库设置
public void test() {
WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.defaultHeader(HttpHeaders.USER_AGENT,"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)")
.defaultCookie("ACCESS_TOKEN", "test_token")
.build();
Mono<String> mono = webClient
.get() // GET 请求
.uri("/posts/1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); //响应数据类型转换
System.out.println(mono.block());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
不变性
一旦构建完成,WebClient 就是不可变的。但是,可以克隆它并构建一个修改后的副本,如下所示:
WebClient client1 = WebClient.builder()
.filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate()
.filter(filterC).filter(filterD).build();
2
3
4
5
# GET 请求
# 使用 retrieve()
# 获取 String 结果数据
下面代码将响应结果映射为一个 String 字符串,并打印出来。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
Mono<String> mono = webClient
.get() // GET 请求
.uri("/posts/1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
System.out.println(mono.block());
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 将结果转换为对象
当响应的结果是 JSON 时,也可以直接指定为一个 Object,WebClient 将接收到响应后把 JSON 字符串转换为对应的对象。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
Mono<PostBean> mono = webClient
.get() // GET 请求
.uri("/posts/1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(PostBean.class); // 响应数据类型转换
System.out.println(mono.block());
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
其中定义的实体 Bean 代码如下:
@Getter
@Setter
@ToString
public class PostBean {
private int userId;
private int id;
private String title;
private String body;
}
2
3
4
5
6
7
8
9
# 将结果转成集合
假设接口返回的是一个 json 数组。
我们可以将其转成对应的 Bean 集合:
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
Flux<PostBean> flux = webClient
.get() // GET 请求
.uri("/posts") // 请求路径
.retrieve() // 获取响应体
.bodyToFlux(PostBean.class); // 响应数据类型转换
List<PostBean> posts = flux.collectList().block();
System.out.println("结果数:" + posts.size());
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 参数传递的几种方式
下面 3 种方式的结果都是一样的。
使用占位符的形式传递参数:
Mono<String> mono = webClient
.get() // GET 请求
.uri("/{1}/{2}", "posts", "1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
2
3
4
5
6
另一种使用占位符的形式:
String type = "posts";
int id = 1;
Mono<String> mono = webClient
.get() // GET 请求
.uri("/{type}/{id}", type, id) // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
System.out.println(mono.block());
2
3
4
5
6
7
8
9
我们也可以使用 map 装载参数:
Map<String,Object> map = new HashMap<>();
map.put("type", "posts");
map.put("id", 1);
Mono<String> mono = webClient
.get() // GET 请求
.uri("/{type}/{id}", map) // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); //响应数据类型转换
2
3
4
5
6
7
8
9
# subscribe 订阅(非阻塞式调用)
前面的样例我们都是人为地使用 block 方法来阻塞当前程序。其实 WebClient 是异步的,也就是说等待响应的同时不会阻塞正在执行的线程。只有在响应结果准备就绪时,才会发起通知。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
System.out.println("--- begin ---");
Mono<String> mono = webClient
.get() // GET 请求
.uri("/posts/1") // 请求路径
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
// 订阅(异步处理结果)
mono.subscribe(result -> {
System.out.println(result);
});
System.out.println("--- end ---");
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 使用 exchange()
方法介绍
- 前面我们都是使用
retrieve()
方法直接获取到了响应的内容,如果我们想获取到响应的头信息、Cookie 等,可以在通过 WebClient 请求时把调用retrieve()
改为调用exchange()
。 - 通过
exchange()
方法可以访问到代表响应结果的对象,通过该对象我们可以获取响应码、contentType、contentLength、响应消息体等。
使用样例
下面代码请求一个网络接口,并将响应体、响应头、响应码打印出来。其中响应体的类型设置为 String。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
Mono<ClientResponse> mono = webClient
.get() // GET 请求
.uri("/posts/1") // 请求路径
.exchange();
// 获取完整的响应对象
ClientResponse response = mono.block();
HttpStatus statusCode = response.statusCode(); // 获取响应码
int statusCodeValue = response.rawStatusCode(); // 获取响应码值
Headers headers = response.headers(); // 获取响应头
// 获取响应体
Mono<String> resultMono = response.bodyToMono(String.class);
String body = resultMono.block();
// 输出结果
System.out.println("statusCode:" + statusCode);
System.out.println("statusCodeValue:" + statusCodeValue);
System.out.println("headers:" + headers.asHttpHeaders());
System.out.println("body:" + body);
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# POST 请求
# 使用 retrieve()
# 发送一个 JSON 格式数据(使用 json 字符串)
下面代码使用 post 方式发送一个 json 格式的字符串,并将结果打印出来(以字符串的形式)。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
// 需要提交的 json 字符串
String jsonStr = "{\"userId\": 222,\"title\": \"abc\",\"body\": \"航歌\"}";
// 发送请求
Mono<String> mono = webClient
.post() // POST 请求
.uri("/posts") // 请求路径
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject(jsonStr))
.retrieve() // 获取响应体
.bodyToMono(String.class); //响应数据类型转换
// 输出结果
System.out.println(mono.block());
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
发送一个 JSON 格式数据(使用 Java Bean)
下面代码使用 post 方式发送一个 Bean 对象,并将结果打印出来(以字符串的形式)。结果同上面是一样的:
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
// 要发送的数据对象
PostBean postBean = new PostBean();
postBean.setUserId(222);
postBean.setTitle("abc");
postBean.setBody("航歌");
// 发送请求
Mono<String> mono = webClient
.post() // POST 请求
.uri("/posts") // 请求路径
.contentType(MediaType.APPLICATION_JSON_UTF8)
.syncBody(postBean)
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
// 输出结果
System.out.println(mono.block());
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
上面发送的 Bean 对象实际上会转成如下格式的 JSON 数据提交
# 使用 Form 表单的形式提交数据
下面样例使用 POST 方式发送 multipart/form-data
格式的数据:
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
//提交参数设置
MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
map.add("title", "abc");
map.add("body", "航歌");
// 发送请求
Mono<String> mono = webClient
.post() // POST 请求
.uri("/posts") // 请求路径
.contentType(MediaType.APPLICATION_FORM_URLENCODED)
.body(BodyInserters.fromFormData(map))
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
// 输出结果
System.out.println(mono.block());
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
上面代码最终会通过如下这种 form 表单方式提交数据。
将结果转成自定义对象
上面样例我们都是将响应结果以 String 形式接收,其实 WebClient 还可以自动将响应结果转成自定的对象或则数组。
# 设置 url 参数
如果 url 地址上面需要传递一些参数,可以使用占位符的方式:
String url = "http://jsonplaceholder.typicode.com/{1}/{2}";
String url = "http://jsonplaceholder.typicode.com/{type}/{id}";
2
# subscribe 订阅(非阻塞式调用)
前面的样例我们都是人为地使用 block 方法来阻塞当前程序。其实 WebClient 是异步的,也就是说等待响应的同时不会阻塞正在执行的线程。只有在响应结果准备就绪时,才会发起通知。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
System.out.println("--- begin ---");
// 需要提交的 json 字符串
String jsonStr = "{\"userId\": 222,\"title\": \"abc\",\"body\": \"航歌\"}";
Mono<String> mono = webClient
.post() // POST 请求
.uri("/posts") // 请求路径
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject(jsonStr))
.retrieve() // 获取响应体
.bodyToMono(String.class); // 响应数据类型转换
// 订阅(异步处理结果)
mono.subscribe(result -> {
System.out.println(result);
});
System.out.println("--- end ---");
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 使用 exchange()
方法介绍
- 前面我们都是使用 retrieve() 方法直接获取到了响应的内容,如果我们想获取到响应的头信息、Cookie 等,可以在通过 WebClient 请求时把调用 retrieve() 改为调用 exchange()。
- 通过 exchange() 方法可以访问到代表响应结果的对象,通过该对象我们可以获取响应码、contentType、contentLength、响应消息体等。
使用样例
下面代码请求一个网络接口,并将响应体、响应头、响应码打印出来。其中响应体的类型设置为 String。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://jsonplaceholder.typicode.com")
.build();
@GetMapping("/test")
public void test() {
// 需要提交的 json 字符串
String jsonStr = "{\"userId\": 222,\"title\": \"abc\",\"body\": \"航歌\"}";
// 发送请求
Mono<ClientResponse> mono = webClient
.post() // POST 请求
.uri("/posts") // 请求路径
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromObject(jsonStr))
.exchange();
// 获取完整的响应对象
ClientResponse response = mono.block();
HttpStatus statusCode = response.statusCode(); // 获取响应码
int statusCodeValue = response.rawStatusCode(); // 获取响应码值
Headers headers = response.headers(); // 获取响应头
// 获取响应体
Mono<String> resultMono = response.bodyToMono(String.class);
String body = resultMono.block();
// 输出结果
System.out.println("statusCode:" + statusCode);
System.out.println("statusCodeValue:" + statusCodeValue);
System.out.println("headers:" + headers.asHttpHeaders());
System.out.println("body:" + body);
return;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Attributes
您可以向请求添加属性。如果您想传递信息,这很方便通过过滤器链并影响给定请求的过滤器行为。 例如:
public void test() {
WebClient client = WebClient.builder()
.filter((request, next) -> {
Optional<Object> myAttribute = request.attribute("myAttribute");
System.out.println(myAttribute.get());
})
.build();
client.get().uri("https://example.org/")
.attribute("myAttribute", "...")
.retrieve()
.bodyToMono(Void.class);
}
2
3
4
5
6
7
8
9
10
11
12
13
# 过滤器 Filters
通过 WebClient 注册客户端过滤器(ExchangeFilterFunction)。生成器来拦截和修改请求,下例所示的是请求拦截器:
public void test() {
WebClient client = WebClient.builder()
.filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request)
.header("foo", "bar")
.build();
return next.exchange(filtered);
})
.build();
}
2
3
4
5
6
7
8
9
10
当然这样不明确,不知道这是拦截请求的还是拦截响应的,那么我们可以手动使用请求的拦截器或者响应的拦截器:
/**
* WebClient 使用 filter 拦截器
*/
@Slf4j
public class WebClientFilterDemo {
private static ExchangeFilterFunction filterRequest() {
// ExchangeFilterFunction.ofRequestProcessor 是拦截请求 Request
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
// 获取 attribute
Optional<Object> myAttribute = clientRequest.attribute("myAttribute");
System.out.println(myAttribute.get());
return Mono.just(clientRequest);
});
}
private static ExchangeFilterFunction logResponseStatus() {
// ExchangeFilterFunction.ofResponseProcessor 是拦截响应 Respinse
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
log.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
public static void main(String[] args) throws InterruptedException {
WebClient webClient = WebClient.builder().filter(logResponseStatus()).build();
webClient.get().uri("http://127.0.0.1:8020/order/findOrderByUserId?userId={userId}", 1)
.attribute("myAttribute", "myAttribute")
.exchange()
.subscribe(r -> {
System.out.println(r.headers());
r.bodyToFlux(Order.class).subscribe(System.out::println);
});
// 休眠一会,否则 WebClient 中的线程池还没执行,看不到效果
TimeUnit.SECONDS.sleep(5);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
也可以用于跨领域的问题,比如身份验证。以下示例通过静态工厂方法使用筛选器进行基本身份验证:
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = WebClient.builder()
.filter(basicAuthentication("user", "password"))
.build();
2
3
4
5
可以通过改变现有的 WebClient 实例来添加或删除过滤器,从而产生不影响原始实例的新 WebClient 实例。例如:
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate()
.filters(filterList -> {
filterList.add(0, basicAuthentication("user", "password"));
})
.build();
2
3
4
5
6
7
# 超时时间设定
要配置连接超时
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
2
3
4
5
6
为所有请求配置响应超时
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(2));
// Create WebClient...
2
3
4
为特定请求配置响应超时
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.retrieve()
.bodyToMono(String.class);
2
3
4
5
6
7
8
分别配置读取或写入超时
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create()
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10))
.addHandlerLast(new WriteTimeoutHandler(10)));
// Create WebClient...
2
3
4
5
6
7
8
9
# 异常处理
# 默认异常
当我们使用 WebClient 发送请求时, 如果接口返回的不是 200 状态(而是 4xx、5xx 这样的异常状态),则会抛出 WebClientResponseException 异常。
因此我们可以捕获 WebClientResponseException 异常。比如自定义 Spring Boot 的全局异常捕获类来捕获。
# 适配异常
我们可以通过 doOnError 方法适配所有异常,比如下面代码在发生异常时将其转为一个自定义的异常抛出(这里假设使用 RuntimeException):
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public void test() {
Mono<String> mono = webClient
.get() // GET 请求
.uri("/xxxxx") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class) // 响应数据类型转换
.doOnError(WebClientResponseException.class, err -> {
System.out.println("发生错误:" +err.getRawStatusCode() + " " + err.getResponseBodyAsString());
throw new RuntimeException(err.getResponseBodyAsString());
});
System.out.println(mono.block());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
当然也可以处理特定的异常,比如 ConnectTimeoutException 异常:
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public void test() {
Mono<String> mono = webClient
.get() // GET 请求
.uri("/xxxxx") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class) // 响应数据类型转换
.doOnError(ConnectTimeoutException.class, err -> {
System.out.println("发生错误:" +err.getRawStatusCode() + " " + err.getResponseBodyAsString());
throw new RuntimeException(err.getResponseBodyAsString());
});
System.out.println(mono.block());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 分类异常处理
上面的异常处理方法,只能处理指定的某种异常:ConnectTimeoutException。如果说我们想让异常处理相对通用一些该怎么办?有的小伙伴可能会想到拦截异常的父类 Exception,当然这也是一种办法。
.doOnError(Exception.class, err -> {
System.out.println("发生错误:" +err.getMessage() );
});
2
3
我们下面为大家介绍一种,针对 HTTP 响应异常处理更友好的一种方式。通常来说,异常可以分为两种:
一种是客户端输入或访问异常,比如:访问的资源不存在 404,没有权限访问资源 403,输入的数据不符合格式等等。这种异常通常是用户访问了不该访问的资源,或者输入了不该输入的数据导致的。通常用HTTP状态码表示在 400-499 范围内。
另一种是服务端内部错误,比如:500 服务内部错误、502 网关错误等等。这种异常通常和用户没什么关系,是IT基础设施或者编程导致的异常。 所以我们只需要针对上面的两类异常进行处理即可。如下文代码所示:
e.is4xxClientError()
表示的是 400-499 状态码段的异常e.is5xxClientError()
表示的是 500-599 状态码段的异常
public void test() throws Exception {
Mono<String> mono = getWebClient()
.get() // 发送 GET 请求
.uri("/postss/1") // 服务请求路径,基于 baseurl
.retrieve() // 获取响应体
.onStatus(e -> e.is4xxClientError(), resp -> {
System.out.println("发生客户端输入错误:" + resp.statusCode().value() + " "
+ resp.statusCode().getReasonPhrase());
return Mono.error(new RuntimeException("请求失败"));
})
.onStatus(e -> e.is5xxServerError(), resp -> {
System.out.println("发生服务端错误:" + resp.statusCode().value() + " "
+ resp.statusCode().getReasonPhrase());
return Mono.error(new RuntimeException("服务器异常"));
})
.bodyToMono(String.class); // 响应数据类型转换
System.out.println(mono.block());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 在发生异常时返回默认值
使用 onErrorReturn 返回默认值。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public String test() {
Mono<String> mono = webClient
.get() // GET 请求
.uri("/xxxxx") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class)
.onErrorReturn("请求失败!!!"); // 失败时的默认值
return mono.block();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 重试机制
# 设置重试次数
使用 retry()
方法可以设置当请求异常时的最大重试次数,如果不带参数则表示无限重试,直至成功。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public String test() {
Mono<String> mono = webClient
.get() // GET 请求
.uri("/data") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(3)) // 3 秒超时
.retry(3); // 重试 3 次
return mono.block();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 设置重试时间间隔
使用 retry 方法默认情况下请求失败后会立刻重新发起请求,如果希望在每次重试前加个时间间隔(等待),可以使用 retryBackoff 方法。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public String test() {
Mono<String> mono = webClient
.get() // GET 请求
.uri("/data") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(3)) // 3秒超时
.retryBackoff(3, Duration.ofSeconds(10)); // 重试 3 次,间隔 10 秒
return mono.block();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 指定需要重试的异常
不管是前面的 retry 方法还是 retryBackoff 方法,设置后无论发生何种异常都会进行重试。如果需要更加精细的控制,比如指定的异常才需要重试,则可以使用 retryWhen 方法。
在使用 retryWhen 方法之前,我们项目中还需要先引入 reactor-extra 包,因为需要用到里面的 Retry 类。
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
</dependency>
2
3
4
注意:如果还需要设置对应的重试次数和间隔时间,需要分别通过 Retry 的 retryMax 和 backoff 方法进行设置。
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public String test() {
// 定义重试条件
Retry<?> retry = Retry.anyOf(RuntimeException.class) // 只有 RuntimeException 异常才重试
.retryMax(3) // 重试 3 次
.backoff(Backoff.fixed(Duration.ofSeconds(10))); // 每次重试间隔 10 秒
Mono<String> mono = webClient
.get() // GET 请求
.uri("/data") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(3)) // 3 秒超时
.retryWhen(retry);
return mono.block();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
下面样例除了 RuntimeException 异常外,发生其它一切异常都会进行重试:
@RestController
public class HelloController {
// 创建 WebClient 对象
private WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@GetMapping("/test")
public String test() {
// 定义重试条件
Retry<?> retry = Retry.allBut(RuntimeException.class) // 除了 RuntimeException 异常都重试
.retryMax(3) // 重试 3 次
.backoff(Backoff.fixed(Duration.ofSeconds(10))); // 每次重试间隔 10 秒
Mono<String> mono = webClient
.get() // GET 请求
.uri("/data") // 请求一个不存在的路径
.retrieve() // 获取响应体
.bodyToMono(String.class)
.timeout(Duration.ofSeconds(3)) // 3 秒超时
.retryWhen(retry);
return mono.block();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 阻塞用法
通过在结果的末尾阻塞,可以在同步模式下使用 WebClient
Person person = client.get().uri("/person/{id}", i).retrieve()
.bodyToMono(Person.class)
.block();
List<Person> persons = client.get().uri("/persons").retrieve()
.bodyToFlux(Person.class)
.collectList()
.block();
2
3
4
5
6
7
8
9
如果需要进行多次调用,更有效的方法是避免单独阻塞每个响应,而是等待综合结果
Mono<Person> personMono = client.get().uri("/person/{id}", personId)
.retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
.retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
Map<String, String> map = new LinkedHashMap<>();
map.put("person", person);
map.put("hobbies", hobbies);
return map;
})
.block();
2
3
4
5
6
7
8
9
10
11
12
13
以上仅仅是一个例子。有许多其他的模式和操作符可以组合成一个反应式的管道,进行许多远程调用,可能是嵌套的、相互依赖的,直到最后都不会阻塞。
有了 Flux 或 Mono,你就永远不必在 Spring MVC 或 Spring WebFlux 控制器中阻塞。只需从控制器方法返回结果反应类型。同样原理也适用于 Kotlin 协同程序和 Spring WebFlux,只需在控制器方法中使用暂停函数或返回流。
# HTTP Interface
上面的 WebClient 是编程式远程调用,此外 Spring 允许我们通过定义接口的方式,给任意位置发送 http 请求,实现远程调用,可以用来简化 HTTP 远程访问。需要 WebFlux 场景才可,这种方式和 OpenFeign 类似。
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2
3
4
定义接口
public interface BingService {
@GetExchange(url = "/search")
String search(@RequestParam("keyword") String keyword, @RequsetHeader("Authorization") String auth);
}
2
3
4
5
创建代理 & 测试
@SpringBootTest
class Boot05TaskApplicationTests {
@Test
void contextLoads() throws InterruptedException {
// 1、创建客户端
WebClient client = WebClient.builder()
.baseUrl("https://www.youngkbt.cn")
.codecs(clientCodecConfigurer -> {
clientCodecConfigurer
.defaultCodecs()
.maxInMemorySize(256*1024*1024);
// 响应数据量太大有可能会超出 BufferSize,所以这里设置的大一点
})
.build();
// 2、创建工厂
HttpServiceProxyFactory factory = HttpServiceProxyFactory.builder(WebClientAdapter.forClient(client)).build();
// 3、获取代理对象
BingService bingService = factory.createClient(BingService.class);
// 4、测试调用
Mono<String> search = bingService.search("youngkbt");
System.out.println("==========");
search.subscribe(str -> System.out.println(str));
Thread.sleep(100000);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
当然上面从头到尾写一个请求流程,其实最终的目的主要是调用接口的方法,从而发送请求。
但是当我们有很多类似于 BingService 的接口类来请求时候,每次都要从头到尾创建客户端、工厂、代理对象来调用接口类发送请求,这样是非常繁琐的,所以我们可以将创建客户端、工厂封装成一个配置类。
@Configuration
public class FactoryConfig {
@Bean
public HttpServiceProxyFactory getHttpServiceProxyFactory(){
// 1、创建客户端
WebClient client = WebClient.builder()
.baseUrl("https://www.youngkbt.cn")
.codecs(clientCodecConfigurer -> {
clientCodecConfigurer
.defaultCodecs()
.maxInMemorySize(256*1024*1024);
// 响应数据量太大有可能会超出 BufferSize,所以这里设置的大一点
})
.build();
// 2、创建工厂
return HttpServiceProxyFactory.builder(WebClientAdapter.forClient(client)).build();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
然后
@SpringBootTest
class Boot05TaskApplicationTests {
@Autowired
private FactoryConfig factoryConfig;
@Test
void contextLoads() throws InterruptedException {
HttpServiceProxyFactory factory = factoryConfig.getHttpServiceProxyFactory();
// 获取代理对象
BingService bingService = factoryConfig.getHttpServiceProxyFactory(BingService.class);
// 测试调用
Mono<String> search = bingService.search("youngkbt");
System.out.println("==========");
search.subscribe(str -> System.out.println(str));
Thread.sleep(100000);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21