异步队列
消息队列提供了一种异步通信协议。在异步通信协议中,发送方和接收方不必同时与消息队列交互。
另一方面,典型的 HTTP 是一种同步通信协议,这意味着客户端会被阻塞,直到操作完成。
试想一下:你打电话给某人,然后等待电话铃声响起,与你通话的人就在此时此地聆听你要说的话。通信结束时,你说再见,另一端的人也会回你一句再见。这可以被视为同步通信,因为在收到通信对象的回复之前,您不会做任何事情来结束通信。
但是,如果你向某人发送一条短信,那么在你发送完短信后,你就可以去做你想做的任何事情;当对方想与你交流时,你也可以收到对方的回信。当别人在起草回信时,你可以去做任何你想做的事。虽然你不直接与发件人通信,但你仍然可以与手机保持同步通信,当你收到新信息时,手机会通知你(或者干脆每隔几分钟查看一次手机);但与对方的通信本身是异步的。任何一方都不需要知道对方的任何信息,他们只是在关注自己的短信,以便相互通信。
消息队列模式(RabbitMQ 入门)
RabbitMQ 是一个消息代理;它接受并转发消息。在此,让我们对其进行配置,以便从一个 PHP 脚本向另一个 PHP 脚本发送消息。
想象一下,我们将一个数据包交给快递员,然后再交给客户端;RabbitMQ 是快递员,而脚本则分别是接收和发送数据包的个人。
第一步,让我们安装 RabbitMQ;我将在 Ubuntu 14.04 系统上进行演示。
首先,我们需要将 RabbitMQ APT 存储库添加到 /etc/apt/sources.list.d
文件夹中。幸运的是,这可以通过命令来执行,如下所示:
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
请注意,版本库可能会发生变化;如果发生变化,您可以在 https://www.rabbitmq.com/install-debian.html 上找到最新的详细信息。
我们还可以选择将 RabbitMQ 公钥添加到受信任的密钥列表中,以避免在通过 apt
命令安装或升级软件包时出现软件包未签名的警告:
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
到目前为止,一切顺利:

接下来,我们只需运行一个 apt-get update
命令,从我们包含的新版本库中获取软件包。完成后,我们就可以使用 apt-get install rabbitmq-server
命令来安装我们需要的软件包了:

请务必在询问时接受各种提示:

安装后,您可以运行 rabbitmqctl status 来检查应用程序的状态以检查其运行是否正常:

让我们的生活变得更轻松一点。我们可以使用 Web GUI 来管理 RabbitMQ;只需运行以下命令:
rabbitmq-plugins enable rabbitmq_management

我们现在可以在 <your server IP here>:15672
看到一个管理界面:

但在登录之前,我们必须创建一些登录凭据。为了做到这一点,我们将不得不返回到命令行。
首先,我们需要设置一个新帐户,用户名为 junade
,密码为 insecurepassword
:
rabbitmqctl add_user junade insecurepassword
然后我们可以添加一些管理员权限:
rabbitmqctl set_user_tags junade administrator
rabbitmqctl set_permissions -p / junade ".*" ".*" ".*"
返回登录页面,输入这些凭据后,我们现在可以看到很酷的管理界面:

现在我们可以测试我们已经安装的内容了。让我们首先为这个新项目编写一个 composer.json
文件:
{
"require": {
"php-amqplib/php-amqplib": "2.5.*"
}
}
RabbitMQ 使用 高级消息队列协议 (AMQP),这也是我们安装 PHP 库的原因,该库基本上可以帮助我们通过此协议与 RabbitMQ 通信。
接下来,我们可以编写一些代码,使用刚刚安装的 RabbitMQ
消息代理发送消息:
这里假定端口是 5672,并且安装在 localhost 上,这可能会根据您的情况发生变化。
让我们编写一个 PHP 脚本来利用它:
Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/GettingStarted/send.php[]
因此,让我们将其细分一下。在前几行中,我们只需包含 Composer 自动加载的库,并说明我们要使用的命名空间。当我们实例化 AMQPStreamConnection
对象时,我们实际上已经连接到了消息代理;然后我们就可以创建一个新的通道对象,并用它来声明一个新队列。我们通过调用 queue_declare
消息来声明队列。持久选项允许消息在 RabbitMQ 重启后继续存活。最后,我们继续发送消息。
现在让我们运行此脚本:
php send.php
输出结果如下:

如果您现在转到 RabbitMQ 的 Web 界面,请单击 “队列” 选项卡并切换 “获取消息” 对话框;您应该能够提取我们刚刚发送给代理的消息:

当然,这只是故事的一半。我们现在需要使用另一个应用程序实际检索此消息。
让我们编写一个 receive.php 脚本:
Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/GettingStarted/recieve.php[]
请注意,前几行与我们的发送脚本相同;我们甚至重新声明队列,以防在运行 send.php
脚本之前运行此接收脚本。
让我们运行 receive.php
脚本:

在另一个 bash 终端中,让我们运行 send.php
脚本几次:

因此,在 receive.php
终端选项卡中,我们现在可以看到我们已经收到了我们发送的消息:

RabbitMQ 文档使用下图来描述消息的基本接受和转发:

发布-订阅模式
发布-订阅者模式(简称 Pub/Sub)是一种设计模式,根据这种模式,消息不会直接从发布者发送到订阅者;相反,发布者会在不知情的情况下推送消息。
在 RabbitMQ 中,生产者从不直接向队列发送任何消息。很多时候,生产者甚至根本不知道消息最终是否会进入队列。相反,生产者必须将消息发送到交换中心。它从生产者那里接收消息,然后将消息推送到队列中。
消费者是接收信息的应用程序。
必须明确告诉交换中心如何处理给定的报文,以及报文应附加到哪个队列。这些规则由交换类型定义。
RabbitMQ 文档对发布-订阅者关系(连接发布者、交换、队列和消费者)的描述如下:

直接(direct)交换类型根据路由密钥传递信息。它既可用于一对一路由,也可用于一对多路由,但最适合一对一关系。
扇出(fanout)交换类型会将信息路由到与之绑定的所有队列,路由密钥会被完全忽略。实际上,你无法根据路由密钥来区分信息将被分发到哪些工作者。
主题(topic)交换类型的工作原理是,根据消息路由队列和用于将队列绑定到交换类型的模式,将消息路由到一个或多个队列。当有多个消费者/应用程序想选择他们想接收的信息类型时,通常在多对多的关系中,这种交换就有可能很好地发挥作用。
报文头(header)交换类型通常用于根据一组属性进行路由,这些属性在报文头中比在路由队列中表达得更好。路由的属性不是使用路由键,而是基于报文头属性。
为了测试 Pub/Sub 队列,我们将使用以下脚本。这些脚本与前面示例中的脚本类似,只是我对它们进行了修改,使其使用交换。下面是我们的 send.php
文件:
Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/PubSub/send.php[]
这是我们的 receive.php
文件。和以前一样,我修改了这个脚本,以便它也使用交换:
Unresolved include directive in modules/ROOT/pages/ch06/ch6-04.adoc - include::example$Chapter 6/PubSub/recieve.php[]
现在,让我们测试这些脚本。我们首先需要运行 receive.php
脚本,然后我们可以使用 send.php
脚本发送消息。
首先,让我们触发 receive.php
脚本以使其开始运行:

完成后,我们可以通过运行 send.php
脚本继续发送消息:

现在,这将使用以下信息填充运行 receive.php
的终端:
