创建简单的RSocket服务器和客户端
Spring为RSocket的消息传递提供了良好的支持,涵盖了所有的4种通信模型。要开始使用RSocket,我们需要在项目构建文件中添加Spring Boot RSocket starter。在Maven的POM文件中,RSocket starter依赖如程序清单14.1所示。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
使用RSocket通信的服务器和客户端应用都需要相同的依赖。
注意:在从Spring Initializr中选择依赖时,你可能会看到一个类似WebSocket的依赖。尽管RSocket和WebSocket的名称很相似,而且使用WebSocket作为RSocket的传输方式也是可行的(14.3节中会进一步介绍),但在使用RSocket时不需要选择WebSocket依赖。 |
接下来,我们需要决定哪种通信模型最适合我们的应用。没有最好的方案,只有最适合的方案,我们要根据应用所需的通信行为来权衡选择。然而,正如我们在接下来的几个例子中会看到的,各种通信模型的开发模式间并没有太大差异,所以我们即便没有一次选到最理想的方案,也可以轻松地改变选择。
我们分别看一下如何使用各种通信模型在Spring中创建RSocket服务器和客户端。由于RSocket的每种通信模型都是不同的,并且分别适用于特定的使用场景,因此我们暂时将Taco Cloud应用程序放在一边,看看如何在不同问题领域中使用RSocket。首先,我们会看到如何使用请求-响应通信模型。
使用请求-响应通信模型
在Spring中创建RSocket服务器就像创建控制器类一样简单,这与Web应用或REST服务的创建方式基本相同。如程序清单14.2所示的控制器是一个RSocket服务的例子,它处理来自客户端的问候(greeting)请求,并以另一个问候作为响应。
package rsocket;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Controller
@Slf4j
public class GreetingController {
@MessageMapping("greeting")
public Mono<String> handleGreeting(Mono<String> greetingMono) {
return greetingMono
.doOnNext(greeting ->
log.info("Received a greeting: {}", greeting))
.map(greeting -> "Hello back to you!");
}
}
我们可以看到,Web控制器和RSocket控制器的关键区别在于,RSocket控制器不是处理指定路径的HTTP请求(使用@GetMapping或@PostMapping),而是使用@MessageMapping注解处理指定路由上的传入消息。在本例中,当一个请求从客户端发送到名为“greeting”的路由时,handleGreeting()方法会被调用。
handleGreeting()方法通过一个Mono<String>参数接收来自客户端的消息载荷。在本例中,我们的问候内容很简单,使用字符串就足够了,但如果有需要,传入的载荷也可以是更复杂的类型。在收到Mono<String>后,该方法简单地记录了它收到的问候内容,然后在Mono上使用map()函数创建新的Mono<String>,以携带返回给客户端的响应信息。
RSocket控制器尽管不处理某个路径上的HTTP请求,但可以使路由名称具有与路径相类似的外观,包括可以传入处理器方法的变量占位符。例如,我们对handleGreeting()方法做一些修改:
@MessageMapping("greeting/{name}")
public Mono<String> handleGreeting(
@DestinationVariable("name") String name,
Mono<String> greetingMono) {
return greetingMono
.doOnNext(greeting ->
log.info("Received a greeting from {} : {}", name, greeting))
.map(greeting -> "Hello to you, too, " + name);
}
在本例中,@MessageMapping指定的路由中包含一个名为“name”的占位符变量。它是通过花括号表示的,与Spring MVC控制器中指定路径变量的方式相同。同样,该方法会接受一个用@DestinationVariable注解标注的String参数,该参数会引用占位符变量的值。就像Spring MVC的@PathVariable注解,@DestinationVariable用来提取路由占位符中指定的值,并将其传入处理器方法。进入这个新版本的handleGreeting()方法之后,路由中指定的名字将被用来向客户端返回更具个性化的问候响应。
另外,我们需要记得在创建RSocket服务器时指定要监听的端口。默认情况下,RSocket服务是基于TCP的,并且服务器监听一个特定的端口。spring.rsocket.server.port配置项可以设置RSocket服务器的端口,如下所示:
spring:
rsocket:
server:
port: 7000
spring.rsocket.server.port属性有两个作用:启用服务器、指定服务器需要监听的端口。如果没有设置该属性,那么Spring将认为该应用只作为客户端,没有服务器端口需要被监听。在本例中,我们要启动服务器,因此如前面的代码所示,设置spring.rsocket. server.port属性将启动一个监听端口为7000的服务器。
现在让我们把注意力转向RSocket客户端。在Spring中,RSocket客户端是通过RSocketRequester实现的。Spring Boot对RSocket的自动配置将在Spring应用上下文中自动创建一个RSocketRequester.Builder类型的bean。你可以将该构建器bean注入需要它的其他bean,以创建RSocketRequester的实例。
例如,如下是ApplicationRunner bean的初始代码,它注入了一个RSocketRequester. Builder实例。
package rsocket;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
@Configuration
@Slf4j
public class RSocketClientConfiguration {
@Bean
public ApplicationRunner sender(RSocketRequester.Builder requesterBuilder)
{
return args -> {
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
// ... send messages with RSocketRequester ...
};
}
}
在本例中,构建器被用来创建一个监听localhost中7000端口的RSocketRequester实例。然后,生成的RSocketRequester实例即可用来向服务器发送消息。
在请求-响应模型中,请求需要(至少)指定路由和数据载荷。回忆一下,我们服务器的控制器正在等待处理那些路由至“greeting”的请求,并期待有String类型的输入。该控制器也会返回String类型的输出。如程序清单14.3所示的完整客户端代码展示了如何向服务器发送问候请求并处理响应。
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
// ... send messages with RSocketRequester ...
tcp
.route("greeting")
.data("Hello RSocket!")
.retrieveMono(String.class)
.subscribe(response -> log.info("Got a response: {}", response));
这将向“greeting”路由上的服务器发送内容为“Hello RSocket!”的问候数据。请注意,它预期返回Mono<String>,这是在调用retrieveMono()时指定的。随后subscribe()方法订阅了返回的Mono,并通过输出日志来处理其响应的载荷。
现在,假设你想向另一个路由发送一个问候数据,该路由在其路由配置中接受一个变量值。客户端代码的工作方式基本相同,只是在给route()的参数值中包含了变量占位符,以及它应该包含的实际数据值,如下所示:
String who = "Craig";
tcp
.route("greeting/{name}", who)
.data("Hello RSocket!")
.retrieveMono(String.class)
.subscribe(response -> log.info("Got a response: {}", response));
此时,消息将被发送到名为“greeting/Craig”的路由中,它将由对应的控制器处理方法进行处理,该方法的@MessageMapping注解指定了它能够处理的路由为“greeting/{name}”。虽然我们也可以在路由中硬编码路由名称,或者使用String拼接来创建路由名称,但在客户端使用占位符,可以很容易地插入一个值,而不会出现String拼接的杂乱情况。
请求-响应模型可能是RSocket通信模型中较容易理解的。这只是一个开始。接下来我们看看如何用请求-流模型处理可能返回多个响应的请求。
处理请求-流的消息
并非所有的交互特性都具有单一请求和单一响应。例如,在一个股票报价的场景中,针对特定的股票代码,如果能够得到一个股票的报价流,那会是非常有用的。若使用请求-响应模型,客户端就需要反复轮询当前的股票价格。但使用请求-流模型,客户只需询问一次股票价格,就能订阅一个定期更新报价的流。
为了阐述请求-流模型,我们将为股票报价场景实现服务器和客户端。首先,我们需要定义一个可以携带股票报价信息的对象。程序清单14.4中的StockQuote类可以帮助我们做到这一点。
package rsocket;
import java.math.BigDecimal;
import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class StockQuote {
private String symbol;
private BigDecimal price;
private Instant timestamp;
}
我们可以看到,StockQuote 类带有股票代码、价格,以及表示价格有效期的时间戳。为简洁起见,我们使用 Lombok 来创建构造器和访问器方法。
现在,我们写一个控制器来处理股票报价的请求。你会发现程序清单14.5中的StockQuoteController与14.2.1小节中的GreetingController非常相似。
package rsocket;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
@Controller
public class StockQuoteController {
@MessageMapping("stock/{symbol}")
public Flux<StockQuote> getStockPrice(
@DestinationVariable("symbol") String symbol) {
return Flux
.interval(Duration.ofSeconds(1))
.map(i -> {
BigDecimal price = BigDecimal.valueOf(Math.random() * 10);
return new StockQuote(symbol, price, Instant.now());
});
}
}
在这里,getStockPrice()方法处理来自“stock/{symbol}”路由的传入请求,并通过@DestinationVariable注解接受路由中的股票代码。为简单起见,我们不去查询实际的股票价格,而是通过随机值计算以得到股票的价格(或许就能准确模拟一些实际股票的波动)。
对于getStockPrice()方法,最值得注意的是,它返回Flux<StockQuote>而不是Mono<StockQuote>。这对Spring来说是一条线索,说明这个处理器方法支持请求-流模型。在内部,Flux最初是使用一个间隔操作符(interval)创建的,每秒发布一次数据,这个Flux被映射到另一个随机产生的StockQuote Flux上。简单地说,由getStockPrice()方法处理的请求会以每秒一次的频率返回多个值。
请求-流服务的客户端与请求-响应服务的客户端类似。唯一的重要差异是,请求-流服务的客户端需要调用requester的retreiveFlux()方法而不是retrieveMono()方法。股票报价服务的客户端看起来可能如下所示:
String stockSymbol = "XYZ";
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp
.route("stock/{symbol}", stockSymbol)
.retrieveFlux(StockQuote.class)
.doOnNext(stockQuote ->
log.info(
"Price of {} : {} (at {})",
stockQuote.getSymbol(),
stockQuote.getPrice(),
stockQuote.getTimestamp())
)
.subscribe();
至此,我们已经看到如何创建处理单个或多个响应的RSocket服务器和客户端。但是,如果服务器没有响应要返回,或者客户端不需要响应,应该怎么办呢?接下来,我们看一下如何处理即发即忘的通信模型。
发送即发即忘的消息
想象一下,你现在正位于一艘星际舰船上,刚刚遭受了敌人舰船的攻击。你发出了全舰“红色警报”,所有人员都进入了战斗状态。你不需要等待舰上的计算机返回收到警报状态的响应,因为在这种情况下你根本没有时间等待和处理任何形式的响应。触发了警报后,你就必须要继续处理更重要的事情了。
这就是“即发即忘”的一个样例。尽管你可能不会忘记你正处于红色警报的状态,但鉴于目前的情况,处理战争危机比处理触发警报的响应更重要。
为了模拟这种情况,我们将创建一个RSocket服务器以处理警报状态但不返回任何内容。首先,我们需要定义一个携带请求载荷的类,如程序清单14.6的Alert类所示。
package rsocket;
import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Alert {
private Level level;
private String orderedBy;
private Instant orderedAt;
public enum Level {
YELLOW, ORANGE, RED, BLACK
}
}
Alert对象包含了警报级别、下令触发警报的人,以及警报触发时刻的时间戳(定义为Instant类型)。同样,我们使用Lombok创建构造器和访问器方法以保持程序清单的简洁。
在服务器端,程序清单14.7中的AlertController会处理警报信息。
package rsocket;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Controller
@Slf4j
public class AlertController {
@MessageMapping("alert")
public Mono<Void> setAlert(Mono<Alert> alertMono) {
return alertMono
.doOnNext(alert ->
log.info("{} alert ordered by {} at {}",
alert.getLevel(),
alert.getOrderedBy(),
alert.getOrderedAt())
)
.thenEmpty(Mono.empty());
}
}
setAlert()方法会处理“alert”路由上的Alert消息。为了保持简洁(尽管在实际的战斗中这没有什么用处),该方法只会以日志的形式记录警报。但需要特别注意的是,它返回一个Mono<Void>来表示没有响应,因此,这个处理器方法支持即发即忘模型。
在客户端,即发即忘模型的代码与请求-响应或请求-流模型没有太大区别,如下所示:
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp
.route("alert")
.data(new Alert(
Alert.Level.RED, "Craig", Instant.now()))
.send()
.subscribe();
log.info("Alert sent");
然而,需要注意,客户端没有调用retrieveMono()或retrieveFlux(),而只调用了send()方法,表明我们预期无须得到响应。
现在,我们来看看如何处理通道通信模型。在这种模型中,服务器和客户端都能互相发送多条消息。
双向发送消息
目前我们看到的所有通信模型中,客户端都是发送单一的请求,而服务器则以零个、一个或多个响应来回应。在请求-流模型中,服务器能够向客户端以流的方式发送多个响应,但客户端仍然只能发送一个请求。不过,这种“乐趣”为什么是服务器独有的?难道客户端就不能发送多个请求吗?
这就是通道通信模型的用武之地了。在通道通信模型中,客户端可以通过流向服务器发送多个请求,服务器也可以在双方的双向对话中发送多个响应。这种模型是RSocket通信模型中最灵活的一种,当然也是最复杂的一种。
为了演示在Spring中如何使用RSocket通道模型进行通信,我们创建一个计算账单小费的服务,它能够接收请求Flux并返回响应Flux。首先,我们需要定义代表请求和响应的模型对象。程序清单14.8展示的GratuityIn类代表了由客户端发送并由服务器接收的请求。
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityIn {
private BigDecimal billTotal;
private int percent;
}
GratuityIn带有计算小费所需的两个基本信息:账单总额和小费百分比。程序清单14.9展示的GratuityOut类表示响应,与GratuityIn中给出的值对应。该类同时还有一个代表小费数额的gratuity属性。
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityOut {
private BigDecimal billTotal;
private int percent;
private BigDecimal gratuity;
}
程序清单14.10中的GratuityController用于处理小费计算的请求,看起来与本章中我们编写的其他几个控制器很相似。
package rsocket;
import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class GratuityController {
@MessageMapping("gratuity")
public Flux<GratuityOut> calculate(Flux<GratuityIn> gratuityInFlux) {
return gratuityInFlux
.doOnNext(in -> log.info("Calculating gratuity: {}", in))
.map(in -> {
double percentAsDecimal = in.getPercent() / 100.0;
BigDecimal gratuity = in.getBillTotal()
.multiply(BigDecimal.valueOf(percentAsDecimal));
return new GratuityOut(
in.getBillTotal(), in.getPercent(), gratuity);
});
}
}
然而,它与前面的例子有一个明显的区别:它不仅返回Flux,而且还接受Flux作为输入。与请求-流模型相同,返回的Flux使控制器能够将多个值以流的形式发送回客户端。但是,通道模型与请求-流模型的关键差异体现在传入Flux类型的参数上。使用Flux类型的参数允许控制器处理来自客户端的请求流,该请求流会被传入处理器方法。
使用通道模型的客户端与使用请求-流模型的客户端的差异在于前者需要向服务器发送一个Flux<GratuityIn>而不是Mono<GratuityIn>,如程序清单14.11所示。
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
Flux<GratuityIn> gratuityInFlux =
Flux.fromArray(new GratuityIn[] {
new GratuityIn(BigDecimal.valueOf(35.50), 18),
new GratuityIn(BigDecimal.valueOf(10.00), 15),
new GratuityIn(BigDecimal.valueOf(23.25), 20),
new GratuityIn(BigDecimal.valueOf(52.75), 18),
new GratuityIn(BigDecimal.valueOf(80.00), 15)
})
.delayElements(Duration.ofSeconds(1));
tcp
.route("gratuity")
.data(gratuityInFlux)
.retrieveFlux(GratuityOut.class)
.subscribe(out ->
log.info(out.getPercent() + "% gratuity on "
+ out.getBillTotal() + " is "
+ out.getGratuity()));
在本例中,Flux<GratuityIn>是使用fromArray()方法静态创建的,但它可以是从任何数据源创建的Flux,也可以是从反应式数据存储库检索得到的。
你可能已经观察到了一种模式——服务器端控制器的处理方法所接受和返回的反应式类型决定了它所支持的RSocket通信模型。表14.1总结了服务器的输入输出类型和RSocket通信模型之间的关系。
RSocket模型 | 处理器参数 | 处理器的返回值 |
---|---|---|
请求-响应 |
Mono |
Mono |
请求-流 |
Mono |
Flux |
即发即忘 |
Mono |
Mono<Void> |
通道 |
Flux |
Flux |
你可能想知道,服务器是否可以接受Flux并返回Mono。简单来说,没有这样的方案。尽管我们可以想象出在传入的Flux上处理多个请求,并以Mono<Void>进行响应,使其组合成一个通道和即发即忘模型混合体的场景,但并没有RSocket模型可以应对这种情况。因此,RSocket不支持这种方式。