反应式消费REST API
在第8章中,我们使用RestTemplate将客户端请求发送到Taco Cloud API上。RestTemplate有着很久的历史,从Spring 3.0版本开始引入。我们曾经使用它为应用发送了无数的请求,但是RestTemplate提供的方法处理的都是非反应式的领域类型和集合。这意味着,我们如果想要以反应式的方式使用响应数据,就需要使用Flux或Mono对其进行包装。如果已经有了Flux或Mono,想要通过POST或PUT请求发送它们,那么需要在发送请求之前将数据提取到一个非反应式的类型中。
如果能够有一种方式让RestTemplate原生使用反应式类型就好了。不用担心,Spring 提供了WebClient,它可以作为反应式版本的RestTemplate。WebClient能够让我们请求外部API时发送和接收反应式类型。
WebClient的使用方式与RestTemplate有很大的差别。RestTemplate有多个方法处理不同类型的请求,而WebClient有一个流畅(fluent)的构建者风格接口,能够让我们描述和发送请求。WebClient的通用模式如下:
-
创建WebClient实例(或注入WebClient bean);
-
指定要发送请求的HTTP方法;
-
指定请求中URI和头信息;
-
提交请求;
-
消费响应。
接下来,我们实际看几个WebClient的例子,首先是如何使用WebClient发送HTTP GET请求。
获取资源
作为使用WebClient的样例,假设我们需要通过Taco Cloud API根据ID获取Ingredient对象。如果使用RestTemplate,那么我们可能会使用getForObject()方法。但是,借助WebClient,我们会构建请求、获取响应并提取一个会发布Ingredient对象的Mono:
Mono<Ingredient> ingredient = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... });
在这里,我们首先使用create()创建了一个新的WebClient实例,然后使用get()和uri()定义对http://localhost:8080/ingredients/{id}的GET请求,其中{id}占位符会被ingredientId的值替换。接着,retrieve()会执行请求。最后,我们调用bodyToMono()将响应体的载荷提取到Mono<Ingredient>中,就可以继续执行Mono的其他操作了。
为了对bodyToMono()返回Mono执行其他的操作,很重要的一点是要在请求发送之前订阅。发送请求来获取值的集合是非常容易的。例如,如下的代码片段将获取所有配料:
Flux<Ingredient> ingredients = WebClient.create()
.get()
.uri("http://localhost:8080/ingredients")
.retrieve()
.bodyToFlux(Ingredient.class);
ingredients.subscribe(i -> { ... });
获取多个条目与获取单个条目的大部分代码都是相同的。最大的差异在于我们不再使用bodyToMono()将响应体提取为Mono,而是使用bodyToFlux()将其提取为一个Flux。
与bodyToMono()类似,bodyToFlux()返回的Flux还没有被订阅。在数据流过之前,我们可以对Flux添加一些额外的操作(过滤、映射等)。因此,非常重要的一点就是要订阅结果所形成的Flux,否则请求将始终不会发送。
使用基础URI发送请求
你可能会发现,很多请求都会使用一个通用的基础URI。这样一来,创建WebClient bean的时候设置一个基础URI并将其注入所需的位置就是非常有帮助的。这样的bean可以按照如下的方式来声明:
@Bean
public WebClient webClient() {
return WebClient.create("http://localhost:8080");
}
然后,在想要使用基础URI的任意地方,我们都可以将WebClient bean注入,并按照如下的方式来使用:
@Autowired
WebClient webClient;
public Mono<Ingredient> getIngredientById(String ingredientId) {
Mono<Ingredient> ingredient = webClient
.get()
.uri("/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
ingredient.subscribe(i -> { ... });
}
因为WebClient已经创建好了,所以我们可以通过get()方法直接使用它。对于URI来说,我们只需要调用uri()指定相对于基础URI的相对路径。
对长时间运行的请求进行超时处理
我们需要考虑的一件事情就是,网络并不是始终可靠的,或者并不像我们预期的那么快,远程服务器在处理请求时有可能会非常缓慢。理想情况下,对远程服务的请求会在一个合理的时间内返回。即使无法正常返回,我们也希望客户端能够避免陷入长时间等待响应的窘境。
为了避免客户端请求被缓慢的网络或服务阻塞,我们可以使用Flux或Mono的timeout()方法,为等待数据发布的过程设置一个时长限制。作为样例,我们考虑一下如何为获取配料数据使用timeout()方法:
Flux<Ingredient> ingredients = webclient
.get()
.uri("/ingredients")
.retrieve()
.bodyToFlux(Ingredient.class);
ingredients
.timeout(Duration.ofSeconds(1))
.subscribe(
i -> { ... },
e -> {
// handle timeout error
});
可以看到,在订阅Flux之前,我们调用了timeout()方法,将持续时间设置成了1秒。如果请求能够在1秒内返回,就不会有任何问题。如果请求耗时超过1秒,程序就会超时,从而调用传递给subscribe()的第二个参数——错误处理器。
发送资源
使用WebClient发送数据与接收数据并没有太大的差异。作为样例,假设我们有一个Mono<Ingredient>,并且想要将Mono发布的Ingredient对象以POST请求的形式发送到相对路径为“/ingredients”的URI上。我们需要做的就是使用post()方法来替换get(),并通过body()方法指明要使用Mono来填充请求体:
Mono<Ingredient> ingredientMono = Mono.just(
new Ingredient("INGC", "Ingredient C", Ingredient.Type.VEGGIES));
Mono<Ingredient> result = webClient
.post()
.uri("/ingredients")
.body(ingredientMono, Ingredient.class)
.retrieve()
.bodyToMono(Ingredient.class);
result.subscribe(i -> { ... });
如果没有要发送的Mono或Flux,而只有原始的领域对象,那么可以使用bodyValue()方法。例如,假设我们没有Mono<Ingredient>,而有一个想要在请求体中发送的Ingredient对象,那么可以这样做:
Ingedient ingredient = ...;
Mono<Ingredient> result = webClient
.post()
.uri("/ingredients")
.bodyValue(ingredient)
.retrieve()
.bodyToMono(Ingredient.class);
result.subscribe(i -> { ... });
如果我们不使用POST请求,而是使用PUT请求更新一个Ingredient,就可以用put()来替换post(),并相应地调整URI路径:
Mono<Void> result = webClient
.put()
.uri("/ingredients/{id}", ingredient.getId())
.bodyValue(ingredient)
.retrieve()
.bodyToMono(Void.class);
result.subscribe();
PUT请求的响应载荷一般是空的,所以我们必须要求bodyToMono()返回一个Void类型的Mono。一旦订阅该Mono,请求就会立即发送。
删除资源
WebClient还支持通过其delete()方法移除资源。例如,根据ID删除配料:
Mono<Void> result = webClient
.delete()
.uri("/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Void.class);
result.subscribe();
与PUT请求类似,DELETE请求的响应不会有载荷。同样,返回并订阅Mono<Void>就会发送请求。
处理错误
到目前为止,所有的WebClient样例都假设有正常的结果,而没有400级别和500级别的状态码。如果出现这两种类型的错误状态,WebClient会记录失败信息,然后继续执行后续的操作。
如果我们需要处理这种错误,那么可以调用onStatus()来指定处理各种类型HTTP状态码的方式。onStatus()接受两个函数,其中一个断言函数来匹配HTTP状态,另一个函数会得到ClientResponse对象,并返回Mono<Throwable>。
为了阐述如何使用onStatus()创建自定义的错误处理器,请参考如下使用WebClient根据ID获取配料的样例:
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
如果ingredientId的值能够匹配已知的资源,那么结果得到的Mono在订阅时会发布一个Ingredient。如果找不到匹配的配料呢?
当订阅可能会出现错误的Mono或Flux时,很重要的一点就是在调用subscribe()注册数据消费者的同时注册一个错误消费者:
ingredientMono.subscribe(
ingredient -> {
// handle the ingredient data
...
},
error-> {
// deal with the error
...
});
如果能够找到配料资源,则调用传递给subscribe()的第一个lambda表达式(数据消费者),并且将匹配的Ingredient对象传递过来。但是,如果找不到资源,那么请求将会得到一个HTTP 404 (NOT FOUND)状态码的响应,它将会导致第二个lambda表达式(错误消费者)被调用,并且传递默认的WebClientResponseException。
WebClientResponseException最大的问题在于它无法明确指出导致Mono失败的原因。它的名字表明在WebClient发起的请求中出现了响应错误,但是我们需要深入WebClientResponseException才能知道哪里出现了错误。如果给错误信息的消费者所提供的异常能够更加专注业务领域而不是专注WebClient,那就更好了。
我们可以添加一个自定义的错误处理器,并在这个处理器中提供将状态码转换为自己所选择的Throwable的代码。如果请求配料资源时试图获取Mono失败,就生成一个UnknownIngredientException。可以在调用retrieve()之后添加一个对onStatus()的调用,从而实现这一点:
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
response -> Mono.just(new UnknownIngredientException()))
.bodyToMono(Ingredient.class);
调用onStatus()时第一个参数是断言,它接收HttpStatus,如果状态码是我们想要处理的,则会返回true。如果状态码匹配,响应将会传递给第二个参数的函数并按需处理,最终返回Throwable类型的Mono。
在样例中,如果状态码是400级别的(比如客户端错误),则会返回包含UnknownIngredientException的Mono。这会导致ingredientMono因为该异常而失败。
需要注意,HttpStatus::is4xxClientError是对HttpStatus的is4xxClientError的方法引用。这意味着此时会基于HttpStatus对象调用该方法。我们也可以使用HttpStatus的其他方法作为方法引用,还可以以lambda表达式或方法引用的形式提供其他能够返回boolean值的函数。
例如,在错误处理中,我们可以更加精确地检查HTTP 404 (NOT FOUND)状态,只需将对onStatus()的调用修改成如下形式:
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.onStatus(status -> status == HttpStatus.NOT_FOUND,
response -> Mono.just(new UnknownIngredientException()))
.bodyToMono(Ingredient.class);
值得注意的是,可以按需调用onStatus()任意次,以便处理响应中可能返回的各种HTTP状态码。
交换请求
到目前为止,在使用WebClient的时候,我们都是使用它的retrieve()方法发送请求。在这些场景中,retrieve()方法会返回一个ResponseSpec类型的对象,通过调用它的onStatus()、bodyToFlux()和bodyToMono()方法,我们就能处理响应。对于简单的场景,使用ResponseSpec就足够了,但是它在很多方面都有局限性。如果我们想要访问响应的头信息或cookie的值,那么ResponseSpec就无能为力了。
在使用ResponseSpec遇到困难时,可以通过调用exchangeToMono()或exchange ToFlux()方法来代替retrieve()。exchangeToMono()方法会返回ClientResponse类型的Mono,我们可以对它采用各种反应式操作,以便探测和使用整个响应中的数据,包括载荷、头信息和cookie。exchangeToFlux()方法的工作方式大致与此相同,但为了处理响应中的多个数据项,该方法会返回ClientResponse类型的Flux对象。
在了解exchangeToMono()和exchangeToFlux()方法与retrieve()的差异之前,我们先看一下它们之间的相似之处。如下的代码片段通过WebClient和exchangeToMono()方法,根据ID获取配料:
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.exchangeToMono(cr -> cr.bodyToMono(Ingredient.class));
这几乎与使用retrieve()的样例相同:
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.retrieve()
.bodyToMono(Ingredient.class);
在exchangeToMono()样例中,我们不是使用ResponseSpec对象的bodyToMono()方法来获取Mono<Ingredient>,而是得到了一个Mono<ClientResponse>,通过调用它的bodyToMono()获取到我们想要的Mono。
现在,我们看一下exchangeToMono()与retrieve()的差异在什么地方。假设请求的响应中包含一个名为X_UNAVAILABLE的头信息,如果它的值为true,则表明该配料是不可用的(因为某种原因)。为了讨论方便,假设我们希望在这个头信息存在的情况下得到空的Mono,即不返回任何内容。通过添加另外一个flatMap()调用,我们就能实现这一点。WebClient的完整调用过程如下所示:
Mono<Ingredient> ingredientMono = webClient
.get()
.uri("http://localhost:8080/ingredients/{id}", ingredientId)
.exchangeToMono(cr -> {
if (cr.headers().header("X_UNAVAILABLE").contains("true")) {
return Mono.empty();
}
return Mono.just(cr);
})
.flatMap(cr -> cr.bodyToMono(Ingredient.class));
exchangeToMono()的调用会探查给定ClientRequest对象的响应头,查看是否存在值为true的X_UNAVAILABLE头信息。如果存在,就返回一个空的Mono;否则,返回一个包含ClientResponse的新Mono。不管是哪种情况,返回的Mono都会成为后续flatMap()操作所要使用的那个Mono。