使用常见的反应式操作
Flux和Mono是Reactor提供的最基础的构建块,这两种反应式类型所提供的操作就像黏合剂,使我们能够据此创建数据流的管道。Flux和Mono共有超过500个操作,这些操作大致可以归类为:
-
创建操作;
-
组合操作;
-
转换操作;
-
逻辑操作。
虽然逐一介绍这500多个操作会非常有趣,但是本章的篇幅有限,所以我在本节中选择了一些相对实用的操作来进行说明。让我们从创建操作开始吧。
注意:Mono的例子呢? Mono和Flux的很多操作都是相同的,我们没有必要分别针对Mono和Flux进行介绍。此外,虽然Mono的操作也很有用,但是相比而言,Flux上的操作更有趣。我们的大多数示例都会使用Flux,你只需要知道,Mono通常都具有相同的名称的操作。
创建反应式类型
在Spring中使用反应式类型时,我们通常可以从存储库或服务中获得Flux或Mono,并不需要自行创建。但偶尔,我们可能需要创建一个新的反应式Publisher。
Reactor提供了多种创建Flux和Mono的操作。本节将介绍一些创建操作。
根据对象创建
如果我们有一个或多个对象,并想据此创建Flux或Mono,那么可以使用Flux或Mono上的静态 just()方法来创建一个反应式类型,它们的数据会由这些对象来驱动。例如,下面的测试方法基于5个String对象创建了Flux:
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}
现在,我们已经创建了Flux,但是它还没有订阅者。如果没有任何的订阅者,那么数据将不会流动。回想一下花园软管的比喻,假设我们已经将花园软管连接到水龙头上,水龙头的另一侧是来自水厂的水,但是在打开水龙头之前,水不会流动。订阅反应式类型就如同打开数据流的水龙头。
要添加一个订阅者,我们可以在Flux上调用subscribe()方法:
fruitFlux.subscribe(
f -> System.out.println("Here's some fruit: " + f)
);
这里传递给 subscribe()方法的lambda表达式实际上是一个java.util.Consumer,用来创建反应式流的Subscriber。在调用subscribe()之后,数据会开始流动。在这个例子中,没有中间操作,所以数据从Flux直接流向订阅者。
将来自Flux或Mono的数据项打印到控制台是观察反应式类型运行方式的好办法,但实际测试Flux或Mono的更好的方法是使用Reactor提供的StepVerifier。 对于给定的Flux或Mono,StepVerifier将会订阅该反应式类型,在数据流过时对数据使用断言,并在最后验证反应式流是否按预期完成。
例如,要验证预定义的数据流经fruitFlux,可以编写如下所示的测试代码:
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
在这个例子中,StepVerifier订阅了fruitFlux,然后断言Flux中的每个数据项是否与预期的水果名称相匹配。 最后,它验证Flux在发布完 “Strawberry” 之后,整个fruitFlux正常完成。
对于本章的其他例子,我们都可以使用StepVerifier来编写测试,验证Flux或者Mono行为,研究相应的工作原理,从而帮助我们学习和了解Reactor中最有用的操作。
根据集合创建
我们还可以根据数组、Iterable 或者 Java Stream创建Flux。图11.3使用弹珠图展示了如何使用这种方式进行创建。

要根据数组创建Flux,可以调用Flux上的静态方法fromArray(),并为其传入一个源数组:
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
该源数组包含的水果名称与之前使用对象列表创建Flux时的水果名称是相同的,所以该Flux发布的数据会有相同的值。因此,我们可以使用和之前相同的StepVerifier来验证该Flux。
如果需要根据java.util.List、java.util.Set或者其他任意java.lang.Iterable的实现来创建Flux,那么可以将其传递给静态的fromIterable()方法:
@Test
public void createAFlux_fromIterable() {
List<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
如果我们有一个 Java Stream,并且希望将其用作Flux源,那么可以调用fromStream() 方法:
@Test
public void createAFlux_fromStream() {
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
同样,我们可以使用和之前一样的StepVerifier来验证该Flux发布的数据。
生成Flux的数据
有时候我们根本没有可用的数据,只是想使用Flux作为一个计数器,使它每次发送新值时自增1。要创建计数器Flux,我们可以使用静态方法range()。图11.4说明了range()方法的工作原理。

下面的测试方法展示了如何创建一个区间Flux:
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
在这个例子中,我们创建了一个区间Flux,它的起始值为1,结束值为5。StepVerifier 证明了它将发布5个条目,即整数1到5。
另一个与range()方法类似的Flux创建方法是interval()。与range()方法一样,interval()方法会创建一个发布递增值的Flux。但是,interval()的特殊之处在于,我们不是为它设置起始值和结束值,而是指定一个间隔时间,明确应该每隔多长时间发出值。图11.5展示了interval()方法创建Flux原理的弹珠图。

例如,要创建一个每秒发布一个值的Flux,可以使用Flux上的静态interval()方法,如下所示:
@Test
public void createAFlux_interval() {
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}
需要注意的是,通过interval()方法创建的Flux会从0开始发布值,并且后续的条目依次递增。此外,interval()方法没有指定最大值,所以可能会永远运行。我们可以使用take()方法来将结果限制为前5个条目。我们将在11.3.3小节中详细讨论take()方法。
组合反应式类型
有时候,我们可能需要操作两种反应式类型,并以某种方式合并它们。或者,在其他情况下,我们可能需要将Flux拆分为多种反应式类型。在本小节,我们将研究组合及拆分Reactor的Flux和Mono的操作。
合并反应式类型
假设我们有两个Flux流,并且需要据此创建一个能在任意一个上游Flux流可用时产生相同数据的Flux。要将一个Flux与另一个Flux合并,可以使用mergeWith()方法,如图11.6所示。

例如,假设有一个以影视作品中的角色名为值的Flux,还有一个以这些角色喜欢的食物为值的Flux。如下所示的测试方法展示了如何使用mergeWith()方法合并两个Flux对象:
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}
通常,Flux会尽可能快地发布数据。所以,在这里,我们在两个Flux流上使用delayElements()方法来减慢它们的速度,使它们每500毫秒发布一个条目。 此外,为了使食物Flux开始流式传输的时间在角色名Flux之后,我们调用了食物Flux上的delaySubscription方法,使它在订阅后250毫秒才开始发布数据。
在合并了两个Flux对象后,将会得到一个新的Flux。StepVerifier订阅这个合并后的Flux时,它将依次订阅两个源Flux流并启动数据流。
对于合并后的Flux来说,其数据项的发布顺序与源Flux的发布时间一致。因为两个Flux对象都设置为以固定速率发布数据,所以这些值在合并后的Flux中会交错在一起,形成角色名和食物交替出现的结果。如果任何一个Flux的发布时机发生变化,那么就可能会看到Flux接连发布了两个角色名或者两个食物。
因为mergeWith()方法不能完美地保证源Flux之间的先后顺序,所以我们可以考虑使用zip()方法。当两个Flux对象压缩在一起的时候,它将会产生一个新的发布元组的Flux,其中每个元组中都包含了来自每个源Flux的数据项。 图11.7说明了如何将两个Flux对象压缩在一起。

要查看zip()操作实际是如何运行的,可以考虑使用如下的测试方法,它将角色Flux和食物Flux合并在了一起:
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}
需要注意的是,与mergeWith()方法不同,zip()方法是一个静态的创建操作。创建出来的Flux在角色名和角色喜欢的食物之间会完美对齐。从这个合并后的Flux发出的每个条目都是一个Tuple2(一个容纳两个其他对象的容器对象)的实例,其中包含了来自每个源Flux的数据项,并保持着它们发布的顺序。
如果你不想使用Tuple2,而想使用其他类型,可以为zip方法提供一个合并函数来生成你想要的任何对象,合并函数会传入这两个数据项(如图11.8所示)。

例如,下面的测试方法展示了角色名Flux与食物Flux如何合并在一起,并生成一个包含String对象的Flux:
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}
传递给 zip()方法(在这里是一个lambda表达式)的Function会简单地将两个数据项组装成一个句子,然后通过合并后的Flux发布。
选择第一个反应式类型进行发布
假设有两个Flux对象,但我们并不想将它们合并在一起,而是想要创建一个新的Flux,将第一个产生数值的Flux中的数值发布出去。 如图11.9所示,first操作会在两个Flux对象中选择第一个发布值的Flux,并再次发布它的值。

下面的测试方法创建了一个快速的Flux和一个“缓慢”的Flux(其中“缓慢”意味着它在被订阅后100毫秒才会发布数据项)。使用first操作的相关方法,则会创建一个新的Flux,只发布第一个发布值的源Flux的值:
@Test
public void firstWithSignalFlux() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
在这种情况下,因为慢速Flux会在快速Flux开始发布之后的100毫秒才发布值,所以新创建的Flux将会简单地忽略慢的Flux,并仅发布来自快速Flux的值。
转换和过滤反应式流
在数据流经一个流时,我们通常需要过滤掉某些值并对其他的值进行处理。在本小节,我们将介绍流经反应式流的数据转换和过滤操作。
从反应式类型中过滤数据
数据从Flux流出时,对其进行过滤的一个基本方法是简单地忽略指定数目的前几个数据项。skip()操作(如图11.10所示)就能完成这样的工作。

针对具有多个数据项的Flux,skip()操作将创建一个新的Flux,首先跳过指定数量的前几个数据项,然后从源Flux中发布剩余的数据项。下面的测试方法展示了如何使用skip()方法:
@Test
public void skipAFew() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
在本例中下,我们有一个包含5个String数据项的Flux。在这个Flux上调用skip(3)方法后会产生一个新的Flux,跳过前3个数据项,只发布最后2个数据项。
但是,你可能并不想跳过特定数量的条目,而是想要跳过一段时间之内出现的数据。这是skip()操作的另一种形式。如图11.11所示,该操作会产生一个新Flux,它会等待一段指定的时间后发布来自源 Flux 中的数据条目。

下面的测试方法使用skip()操作创建了一个在发布值之前等待4秒的Flux。因为该Flux是基于一个在发布数据项之间有一秒间隔的Flux创建的(使用了delayElements()操作),所以它只会发布出最后两个数据项:
@Test
public void skipAFewSeconds() {
Flux<String> countFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(Duration.ofSeconds(4));
StepVerifier.create(countFlux)
.expectNext("ninety nine", "one hundred")
.verifyComplete();
}
我们已经看过了take()操作的示例,但是根据对skip()操作的描述来看,take()可以认为是与skip()相反的操作。skip()操作会跳过前几个数据项,而take()操作只发布指定数量的前几个数据项(如图11.12所示):
@Test
public void take() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Acadia")
.take(3);
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}

与skip()方法一样,take()方法也有另一种替代形式,基于间隔时间而不是数据项数量。它将在某段时间之内接受并发布与源Flux相同的数据项,之后Flux就会完成。如图11.13所示。

下面的测试方法使用了这种形式的take()方法,它将会在订阅之后的3.5秒内发布数据条目。
@Test
public void takeForAwhile() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Grand Canyon")
.verifyComplete();
}
skip()操作和take()操作都可以认为是过滤操作,其过滤条件基于计数或者持续时间。而Flux值的更通用过滤操作则是filter()。
在使用filter()操作时,我们需要指定一个Predicate,用于决定数据项是否能通过Flux,该操作能够让我们根据任意条件进行选择性地发布消息。图11.14展示了filter()操作的工作原理。

要查看filter()的实际效果,可以参考下面的测试方法:
@Test +
public void filter() {
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon", "Zion", "Grand Teton")
.filter(np -> !np.contains(" "));
StepVerifier.create(nationalParkFlux)
.expectNext("Yellowstone", "Yosemite", "Zion")
.verifyComplete();
}
在这里,我们将只接受不包含空格的字符串的Predicate作为lambda表达式传给filter()方法。因此在结果Flux中,“Grand Canyon”和“Grand Teton”被过滤掉了。
我们可能还想要过滤掉已经接收过的数据项。distinct()操作生成的Flux只会发布源Flux中尚未发布过的数据项,如图11.15所示。

在下面的测试中,调用distinct()方法产生的Flux只会发布不同String值:
@Test
public void distinct() {
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
StepVerifier.create(animalFlux)
.expectNext("dog", "cat", "bird", "anteater")
.verifyComplete();
}
虽然“dog”和“bird”从源Flux中都发布了2次,但是在调用distinct()方法产生的Flux中,它们只发布一次。
映射反应式数据
在Flux或Mono中的一个常见操作是将已发布的数据项转换为其他的形式或类型。Reactor的反应式类型(Flux和Mono)为此提供了map()和flatMap()操作。
map()操作会创建一个新的Flux,该Flux在重新发布它所接收到的每个对象之前,会对其执行由给定Function预先定义的转换。图11.16说明了map()操作的工作原理。

在下面的测试方法中,包含篮球运动员名字的String值的Flux被转换为一个包含Player对象的新Flux。
@Test
public void map() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael", "Jordan"))
.expectNext(new Player("Scottie", "Pippen"))
.expectNext(new Player("Steve", "Kerr"))
.verifyComplete();
}
@Data
private static class Player {
private final String firstName;
private final String lastName;
}
以lambda表达式形式传递给map()方法的函数会将传入的String值按照空格拆分,并使用生成的String数组来创建Player对象。用just()方法创建的Flux包含String对象,但map()方法产生的Flux则包含Player对象。
其中重要的一点在于,在每个数据项被源Flux发布时,map()操作是同步执行的,如果想要执行异步的转换操作,那么应该考虑使用flatMap()操作。
对于flatMap()操作,我们可能需要一些思考和练习才能完全掌握。如图11.17所示,flatMap()并不像map()操作那样简单地将一个对象转换到另一个对象,而是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()方法结合使用时,flatMap()操作可以释放Reactor反应式的异步能力。

下面的测试方法展示了如何使用flatMap()方法和subscribeOn()方法:
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
List<Player> playerList = Arrays.asList(
new Player("Michael", "Jordan"),
new Player("Scottie", "Pippen"),
new Player("Steve", "Kerr"));
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}
需要注意的是,我们为flatMap()方法指定了一个lambda表达式,将传入的String转换为Mono类型的String。然后,map()操作在这个Mono上执行,将String转换为Player。每个内部Flux上的String被映射到一个Player后,再被发布到由flatMap()返回的单一Flux中,从而完成结果的扁平化。
如果我们到此为止,那么产生的Flux将同样包含Player对象,与使用map()操作的例子相同,顺序同步地生成。但是我们对Mono做的最后一件事情是调用subscribeOn()方法声明每个订阅都应该在并行线程中进行,因此可以异步并行地执行多个String对象的转换操作。
尽管subscribeOn()方法的命名与subscribe()方法类似,但二者的含义却完全不同。subscribe()方法更像一个动作,可以订阅并驱动反应式流,而subscribeOn()方法则更具描述性,用于指定如何并发地处理订阅。Reactor本身并不强制使用特定的并发模型。调用subscribeOn()方法时,我们可以使用Schedulers中的任意一个静态方法来指定并发模型。在这个例子中,我们使用了parallel()方法,它使用来自固定线程池(大小与CPU核心数量相同)的工作线程。但是Scheduler支持多种并发模型,如表11.1所示。

使用flatMap()和subscribeOn()的优势在于,我们可以在多个并行线程之间拆分工作,从而增加流的吞吐量。但是,鉴于工作是并行完成的,无法保证哪项工作首先完成,所以结果Flux中数据项的发布顺序是未知的。因此,StepVerifier只能验证发出的每个数据项是否存在于预期的Player对象列表中,并且在Flux完成之前会有3个这样的数据项。
在反应式流上缓冲数据
在处理流经Flux的数据时,将数据流拆分为小块可能会带来一定的收益。如图11.18所示的buffer()操作可以帮助我们实现这个目的。

假设给定一个包含多个String值的Flux,其中每个值代表一种水果。我们可以创建一个新的由List集合组成的Flux,其中每个List包含不超过指定数量的元素:
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);
StepVerifier
.create(bufferedFlux)
.expectNext(Arrays.asList("apple", "orange", "banana"))
.expectNext(Arrays.asList("kiwi", "strawberry"))
.verifyComplete();
}
在本例中,String元素的Flux被缓冲到一个新的由List集合组成的Flux中,其中每个集合的元素数量不超过3个。因此,发出5个String值的原始Flux会转换为新的Flux,这个新的Flux会发出2个List集合,其中一个包含3个水果,而另一个包含2个水果。
这有什么意义?将反应式的Flux缓冲到非反应式的Flux中看起来与本章的目的南辕北辙。但在组合使用buffer()方法和flatMap()方法时,这样做可以使每一个List集合都可以被并行处理:
@Test
public void bufferAndFlatMap() throws Exception {
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
}
在这个新例子中,我们仍然将具有5个String值的Flux缓冲到一个新的由List集合组成的Flux中,但将flatMap()应用于包含List集合的Flux。这样会为每个List缓冲区中的元素创建一个新的Flux,然后对其应用map()操作。因此,每个List缓冲区都会在各个线程中被进一步并行处理。
为了观察实际效果,代码中还包含了一个log()操作,用于每个子Flux。log()操作记录了所有的反应式事件,以便观察实际发生了什么事情。日志将会记录如下的条目(为简洁起见,删除了时间戳):
[main] INFO reactor.Flux.SubscribeOn.1 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()
正如日志记录所清晰展示的,第一个缓冲区中的水果(apple、orange和banana)在parallel-1线程中处理。与此同时,第二个缓冲区中的水果(kiwi和strawberry)在parallel-2 线程中处理。从缓冲区中交织的日志记录可以明显看出,对两个缓冲区的处理是并行执行的。
如果由于某些原因需要将Flux发布的所有数据项都收集到一个List中,那么可以使用不带参数的buffer()方法:
Flux<List<String>> bufferedFlux = fruitFlux.buffer();
这会产生一个新的Flux。这个Flux将会发布一个List,其中包含源Flux发布的所有数据项。我们也可以使用collectList()操作实现相同的功能,如图11.19所示。

collectList()方法会产生Mono而不是Flux,以发布List集合。下面的测试方法展示了它的用法:
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}
有一种更有趣的方法可以收集Flux所发出的数据项:将它们收集到Map中。如图11.20所示,collectMap()操作将会产生一个发布Map的Mono。Map中会填充一些数据项,数据项的键会由给定的Function计算得出。

要查看collectMap()的效果,请参考下面的测试方法:
@Test
public void collectMap() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}
源Flux会发布一些动物名称。基于这个Flux,我们使用collectMap()创建了一个发布Map的新Mono,其中键由动物名称的首字母确定,而值则为动物名称本身。如果两个动物名称以相同的字母开头(如elephant和eagle、koala 和kangaroo),那么最后一个流经该流的条目将会覆盖先前的条目。
在反应式类型上执行逻辑操作
有时候我们想要知道由Mono或者Flux发布的条目是否满足某些条件。all()和any()操作可以实现这样的逻辑。图11.21和图11.22分别展示了all()和any()的工作方式。


假设我们想知道Flux发布的每个String中是否都包含了字母a和字母k。下面的测试展示了如何使用all()方法来检查这个条件:
@Test
public void all() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}
在第一个StepVerifier中,我们检查了字母a。all()方法应用于源Flux,会产生布尔类型的Mono。在本例中,所有动物名称都包含了字母a,所以从生成的Mono中会发布 true。但是在第二个StepVerifier中,产生的Mono将会发出false,因为并非所有动物名称都包含了字母k。
如果至少有一个元素匹配条件即可,而不是要求所有元素均满足条件。那么在这种情况下,我们所需的操作就是any()。下面这个新的测试用例使用any()来检查字母t和字母z:
@Test
public void any() {
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasTMono = animalFlux.any(a -> a.contains("t"));
StepVerifier.create(hasTMono)
.expectNext(true)
.verifyComplete();
Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}
在第一个StepVerifier中,我们会看到生成的Mono发布了true,因为有至少一种动物名称具有字母t(具体来讲,就是elephant)。而在第二个场景中,生成的Mono发布了false,因为用例中没有任何一种动物名称包含字母z。