反应式编程概览
反应式编程是一种可以替代命令式编程的编程范式。这种可替代性得以存在的原因在于:反应式编程解决了命令式编程中的一些限制。理解这些限制,有助于你更好地理解反应式编程模型的优点。
注意:反应式编程不是万能的。我们不应该从这一章或者其他任何关于反应式编程的讨论中得出“命令式编程一无是处,反应式编程才是救星”的结论。如同我们作为开发者学习到的任何技术一样,反应式编程对于某些使用场景的确十分好用,但是在另一些场景中可能不那么适合。建议以实用主义为原则选择编程范式。 |
你如果和我及大量开发者一样,从命令式编程入行,那么你现在编写的大部分(或者所有)代码在将来很可能依然是命令式的。命令式编程相当直观,没有编程经验的学生可以在学校的STEM教育课程中轻松地学习它。命令式编程也足够强大,驱动大型企业的代码大部分都是命令式的。
它的理念很简单:你可以按照顺序逐一将代码编写为需要遵循的指令列表。在某项任务开始执行之后,程序在开始下一项任务之前需要等待当前任务完成。在整个处理过程中的每一步,要处理的数据都必须是完全可用的,以便将它们作为一个整体处理。
一开始一切都很美好,但我们最终会遇到问题:执行一项任务,特别是 I/O 任务(将数据写入到数据库或者从远程服务器获取数据)时,触发这项任务的线程实际上是阻塞的,在任务完成之前不能做任何事情。坦白来说,阻塞线程是一种浪费。
大多数编程语言(包括Java)都支持并发编程。在 Java 中创建另一个线程并让它执行某些操作相当容易,而此时调用线程则可以继续执行其他工作。虽然创建线程很简单,但是这些线程中,多半最终都会阻塞。管理多线程中的并发极具挑战,而更多线程则意味着更高的复杂性。
相比之下,反应式编程本质上是函数式和声明式的。反应式编程不再描述一组依次执行的步骤,而是描述数据会流经的管道或流。反应式流不再要求将被处理的数据作为一个整体进行处理,而能够在数据可用时立即开始处理。实际上,传入的数据可能是无限的(比如某个地理位置的实时温度测量数据的恒定流)。
注意:如果你是Java函数式编程的新手,可以参阅Pierre-Yves Saumont的Functional Programming in Java(Manning,2017年)或Micha Pachta的Grokking Functional Programming(Manning,2021年)。 |
类比现实世界,可以将命令式编程看作水气球,而将反应式编程看作是花园里的软管。在夏天,这两者都是捉弄毫无戒心的朋友的好方式。但是它们的运作方式却不同:
-
水气球只能一次性地填满有效载荷,并在撞到目标时将其打湿。水气球的容量也有限,如果想打湿更多人(或者将同一个人打得更湿一些),就需要增加水气球的数量。
-
软管的有效载荷则是从水龙头到喷嘴的水流。在特定的时间点,花园软管的容量可能是有限的,但是在打水仗的过程中它能供应的水流却是 “无限” 的。只要水源源不断地从龙头流入软管,那么水也会继续源源不断地从喷嘴喷出去。同一个软管也非常好扩展,你可以尽情和更多的朋友打水仗。
虽然使用水气球(或者应用命令式编程)没有什么固有的问题,但是持有软管(或者应用反应式编程)通常可以扩大伸缩性和性能方面的优势。
定义反应式流
反应式流(reactive streams)是Netflix、Lightbend和Pivotal(Spring背后的公司)的工程师于2013年底开始制定的一种规范。反应式流旨在提供无阻塞回压的异步流处理标准。
我们已经接触到反应式编程的异步特性,它使我们能够并行执行任务从而实现更高的可伸缩性。通过回压,数据消费者可以限制它们想要处理的数据量,避免被过快的数据源产生的数据淹没。
Java的流和反应式流
Java的流和反应式流之间有着很许多相似之处。它们的名字中都有流(stream)这个词。它们也都提供了用于处理数据的函数式API。 事实上,正如我们会在Reactor项目中看到的那样,它们甚至可以共享许多操作。
然而,Java的流通常都是同步的,并且只能处理有限的数据集。本质上来说,它们只是使用函数来对集合进行迭代的一种方式。
反应式流支持异步处理任意大小的数据集,包括无限的数据集。只要数据就绪,它们就能实时地处理数据,并且通过回压来避免压垮数据消费者。
JDK 9中的Flow API对应反应式流,其中的Flow.Publisher、Flow.Subscriber、Flow. Subscription和Flow.Processor类型分别直接映射到反应式流中的Publisher、Subscriber、Subscription和Processor。也就是说,JDK 9的Flow API并不是反应式流的实际实现。
反应式流规范可以总结为4个接口,即Publisher、Subscriber、Subscription和Processor。Publisher负责生成数据,并将数据发送给Subscription(每个Subscriber对应一个Subscription)。Publisher接口声明了一个方法subscribe(),Subscriber可以通过该方法向Publisher发起订阅。
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Subscriber一旦订阅成功,就可以接收来自 Publisher 的事件。这些事件是通过Subscriber 接口上的方法发送的:
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}
Subscriber收到的第一个事件是通过对onSubscribe()方法的调用接收的。Publisher调用onSubscribe()方法时,它将Subscription对象传递给Subscriber。通过Subscription,Subscriber可以管理其订阅情况:
public interface Subscription {
void request(long n);
void cancel();
}
Subscriber可以通过调用request()方法来请求Publisher发送数据,也可以通过调用cancel()方法来表明它不再对数据感兴趣并且取消订阅。当调用request()时,Subscriber 可以传入一个long类型的值以表明它愿意接受多少数据。这也是回压能够发挥作用的地方——避免Publisher发送超过 Subscriber处理能力的数据量。在Publisher发送完所请求数量的数据项之后,Subscriber可以再次调用 request()方法来请求更多的数据。
Subscriber请求数据之后,数据就会开始流经反应式流。Publisher发布的每个数据项都会通过调用Subscriber的onNext()方法递交给 Subscriber。如果有任何的错误,则会调用 onError()方法。如果Publisher目前没有更多的数据,而且也不会继续产生更多的数据,那么它将会调用Subscriber的onComplete()方法来告知Subscriber 它已经结束。
至于Processor接口,它是Subscriber和Publisher的组合:
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}
当作为 Subscriber时,Processor会接收数据并以某种方式对数据进行处理。然后,它会将角色转变为Publisher,将处理的结果发布给它的Subscriber。
正如你所看到的,反应式流的规范非常简单。看起来,很容易就能构建一个以Publisher作为开始的数据处理管道,并让数据通过零个或多个Processor,然后将最终结果投递给Subscriber。
然而,反应式流规范的接口本身并不支持以函数式的方式组成这样的流。Reactor项目是反应式流规范的一个实现,它提供了一组用于组装反应式流的函数式API。我们将会在后面的内容中看到,Reactor构成了Spring 反应式编程模型的基础。接下来,我们会探讨Reactor项目(并且,我敢说这个过程非常有意思)。