异步队列

消息队列提供了一种异步通信协议。在异步通信协议中,发送方和接收方不必同时与消息队列交互。

另一方面,典型的 HTTP 是一种同步通信协议,这意味着客户端会被阻塞,直到操作完成。

试想一下:你打电话给某人,然后等待电话铃声响起,与你通话的人就在此时此地聆听你要说的话。通信结束时,你说再见,另一端的人也会回你一句再见。这可以被视为同步通信,因为在收到通信对象的回复之前,您不会做任何事情来结束通信。

但是,如果你向某人发送一条短信,那么在你发送完短信后,你就可以去做你想做的任何事情;当对方想与你交流时,你也可以收到对方的回信。当别人在起草回信时,你可以去做任何你想做的事。虽然你不直接与发件人通信,但你仍然可以与手机保持同步通信,当你收到新信息时,手机会通知你(或者干脆每隔几分钟查看一次手机);但与对方的通信本身是异步的。任何一方都不需要知道对方的任何信息,他们只是在关注自己的短信,以便相互通信。

消息队列模式(RabbitMQ 入门)

RabbitMQ 是一个消息代理;它接受并转发消息。在此,让我们对其进行配置,以便从一个 PHP 脚本向另一个 PHP 脚本发送消息。

想象一下,我们将一个数据包交给快递员,然后再交给客户端;RabbitMQ 是快递员,而脚本则分别是接收和发送数据包的个人。

第一步,让我们安装 RabbitMQ;我将在 Ubuntu 14.04 系统上进行演示。

首先,我们需要将 RabbitMQ APT 存储库添加到 /etc/apt/sources.list.d 文件夹中。幸运的是,这可以通过命令来执行,如下所示:

echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list

请注意,版本库可能会发生变化;如果发生变化,您可以在 https://www.rabbitmq.com/install-debian.html 上找到最新的详细信息。

我们还可以选择将 RabbitMQ 公钥添加到受信任的密钥列表中,以避免在通过 apt 命令安装或升级软件包时出现软件包未签名的警告:

wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -

到目前为止,一切顺利:

image 2023 10 31 12 24 52 572

接下来,我们只需运行一个 apt-get update 命令,从我们包含的新版本库中获取软件包。完成后,我们就可以使用 apt-get install rabbitmq-server 命令来安装我们需要的软件包了:

image 2023 10 31 12 25 48 902

请务必在询问时接受各种提示:

image 2023 10 31 12 26 11 959

安装后,您可以运行 rabbitmqctl status 来检查应用程序的状态以检查其运行是否正常:

image 2023 10 31 12 26 39 410

让我们的生活变得更轻松一点。我们可以使用 Web GUI 来管理 RabbitMQ;只需运行以下命令:

rabbitmq-plugins enable rabbitmq_management
image 2023 10 31 12 27 37 636

我们现在可以在 <your server IP here>:15672 看到一个管理界面:

image 2023 10 31 12 28 22 614

但在登录之前,我们必须创建一些登录凭据。为了做到这一点,我们将不得不返回到命令行。

首先,我们需要设置一个新帐户,用户名为 junade,密码为 insecurepassword

rabbitmqctl add_user junade insecurepassword

然后我们可以添加一些管理员权限:

rabbitmqctl set_user_tags junade administrator
rabbitmqctl set_permissions -p / junade ".*" ".*" ".*"

返回登录页面,输入这些凭据后,我们现在可以看到很酷的管理界面:

image 2023 10 31 12 29 52 016
Figure 1. This is the web interface for the RabbitMQ service, accessible through our web browser

现在我们可以测试我们已经安装的内容了。让我们首先为这个新项目编写一个 composer.json 文件:

{
    "require": {
      "php-amqplib/php-amqplib": "2.5.*"
    }
}

RabbitMQ 使用 高级消息队列协议 (AMQP),这也是我们安装 PHP 库的原因,该库基本上可以帮助我们通过此协议与 RabbitMQ 通信。

接下来,我们可以编写一些代码,使用刚刚安装的 RabbitMQ 消息代理发送消息:

这里假定端口是 5672,并且安装在 localhost 上,这可能会根据您的情况发生变化。

让我们编写一个 PHP 脚本来利用它:

Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/GettingStarted/send.php[]

因此,让我们将其细分一下。在前几行中,我们只需包含 Composer 自动加载的库,并说明我们要使用的命名空间。当我们实例化 AMQPStreamConnection 对象时,我们实际上已经连接到了消息代理;然后我们就可以创建一个新的通道对象,并用它来声明一个新队列。我们通过调用 queue_declare 消息来声明队列。持久选项允许消息在 RabbitMQ 重启后继续存活。最后,我们继续发送消息。

现在让我们运行此脚本:

php send.php

输出结果如下:

image 2023 10 31 12 34 36 885

如果您现在转到 RabbitMQ 的 Web 界面,请单击 “队列” 选项卡并切换 “获取消息” 对话框;您应该能够提取我们刚刚发送给代理的消息:

image 2023 10 31 12 35 21 989
Figure 2. Using this web page in the interface, we can extract messages from the queue so we can look at their contents

当然,这只是故事的一半。我们现在需要使用另一个应用程序实际检索此消息。

让我们编写一个 receive.php 脚本:

Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/GettingStarted/recieve.php[]

请注意,前几行与我们的发送脚本相同;我们甚至重新声明队列,以防在运行 send.php 脚本之前运行此接收脚本。

让我们运行 receive.php 脚本:

image 2023 10 31 12 37 23 710

在另一个 bash 终端中,让我们运行 send.php 脚本几次:

image 2023 10 31 12 37 51 828

因此,在 receive.php 终端选项卡中,我们现在可以看到我们已经收到了我们发送的消息:

image 2023 10 31 12 38 23 291

RabbitMQ 文档使用下图来描述消息的基本接受和转发:

image 2023 10 31 12 38 57 616

发布-订阅模式

发布-订阅者模式(简称 Pub/Sub)是一种设计模式,根据这种模式,消息不会直接从发布者发送到订阅者;相反,发布者会在不知情的情况下推送消息。

在 RabbitMQ 中,生产者从不直接向队列发送任何消息。很多时候,生产者甚至根本不知道消息最终是否会进入队列。相反,生产者必须将消息发送到交换中心。它从生产者那里接收消息,然后将消息推送到队列中。

消费者是接收信息的应用程序。

必须明确告诉交换中心如何处理给定的报文,以及报文应附加到哪个队列。这些规则由交换类型定义。

RabbitMQ 文档对发布-订阅者关系(连接发布者、交换、队列和消费者)的描述如下:

image 2023 10 31 12 40 13 838

直接(direct)交换类型根据路由密钥传递信息。它既可用于一对一路由,也可用于一对多路由,但最适合一对一关系。

扇出(fanout)交换类型会将信息路由到与之绑定的所有队列,路由密钥会被完全忽略。实际上,你无法根据路由密钥来区分信息将被分发到哪些工作者。

主题(topic)交换类型的工作原理是,根据消息路由队列和用于将队列绑定到交换类型的模式,将消息路由到一个或多个队列。当有多个消费者/应用程序想选择他们想接收的信息类型时,通常在多对多的关系中,这种交换就有可能很好地发挥作用。

报文头(header)交换类型通常用于根据一组属性进行路由,这些属性在报文头中比在路由队列中表达得更好。路由的属性不是使用路由键,而是基于报文头属性。

为了测试 Pub/Sub 队列,我们将使用以下脚本。这些脚本与前面示例中的脚本类似,只是我对它们进行了修改,使其使用交换。下面是我们的 send.php 文件:

Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/PubSub/send.php[]

这是我们的 receive.php 文件。和以前一样,我修改了这个脚本,以便它也使用交换:

Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/PubSub/recieve.php[]

现在,让我们测试这些脚本。我们首先需要运行 receive.php 脚本,然后我们可以使用 send.php 脚本发送消息。

首先,让我们触发 receive.php 脚本以使其开始运行:

image 2023 10 31 12 44 18 509

完成后,我们可以通过运行 send.php 脚本继续发送消息:

image 2023 10 31 12 44 45 596

现在,这将使用以下信息填充运行 receive.php 的终端:

image 2023 10 31 12 45 12 517

总结

在本章中,我们学习了架构模式。从 MVC 开始,我们了解了使用用户界面框架的好处和挑战,并讨论了如何以更严格的方式将用户界面与业务逻辑解耦。

然后,我们讨论了 SOA,了解了 SOA 与微服务的比较,以及在分布式系统面临挑战的情况下,这种架构的合理性。

最后,我们深入了解了队列系统、适合的情况以及如何在 RabbitMQ 中实现队列系统。

在下一章和最后一章中,我们将介绍架构模式的最佳实践使用条件。