请求/回复模式

单向通信可以在并行性和效率方面给我们带来巨大的优势,但仅靠它们并不能解决我们所有的集成和通信问题。 有时,一个好的旧请求/回复模式可能就是完成这项工作的完美工具。 但是,在某些情况下,我们所拥有的只是一个异步单向通道。 因此,了解构建抽象所需的各种模式和方法非常重要,该抽象允许我们在单向通道之上以请求/回复的方式交换消息。 这正是我们接下来要学习的内容。

相关标识符

我们要学习的第一个请求/回复模式称为相关标识符,它表示在单向通道之上构建请求/回复抽象的基本块。

该模式涉及使用标识符标记每个请求,然后将其附加到接收者的响应中:这样,请求的发送者可以将两个消息关联起来并将响应返回到正确的处理程序。 这完美地解决了单向异步通道环境中的问题,其中消息可以随时向任何方向传输。 我们看一下下图中的例子:

image 2024 05 08 16 03 35 454
Figure 1. 图 13.21:使用相关标识符的请求/回复消息交换

图 13.21 中描述的场景显示了如何使用相关 ID 使我们能够将每个响应与正确的请求相匹配,即使这些响应是按不同的顺序发送和接收的。 一旦我们开始研究下一个示例,其工作方式就会更加清晰。

使用相关标识符实现请求/回复抽象

现在让我们开始研究一个示例,选择最简单的单向通道类型; 点对点(直接连接系统的两个节点)和全双工(消息可以双向传输)的一种。

在这个简单的通道类别中,我们可以找到例如WebSockets:它们在服务器和浏览器之间建立点对点连接,并且消息可以向任何方向传输。 另一个例子是使用 child_process.fork() 生成子进程时创建的通信通道(我们已经在第 11 章“高级配方”中见过这个 API)。 该通道也是异步、点对点和双工的,因为它仅将父进程与子进程连接,并且允许消息在任何方向上传输。 这可能是该类别中最基本的通道,因此我们将在下一个示例中使用它。

下一个应用程序的计划是构建一个抽象,以包装在父进程和子进程之间创建的通道。 此抽象应通过自动使用相关标识符标记每个请求,然后将任何传入回复的 ID 与等待响应的请求处理程序列表进行匹配,来提供请求/回复通信通道。

从第 11 章 “高级食谱” 中,我们应该记住,父进程可以使用 child.send(message) 向子进程发送消息,而可以使用 child.on('message', callback) 事件处理程序接收消息。

类似地,子进程可以使用 process.send(message) 向父进程发送消息,并使用 process.on('message', callback) 接收消息。

这意味着父进程中可用的通道接口与子进程中可用的通道接口相同。 这将使我们能够构建一个可以从通道两端使用的通用抽象。

抽象请求

让我们通过考虑负责发送新请求的部分来开始构建这个抽象。 让我们创建一个名为 createRequestChannel.js 的新文件,其中包含以下内容:

import {nanoid} from 'nanoid'

export function createRequestChannel(channel) { // (1)
    const correlationMap = new Map()

    function sendRequest(data) { // (2)
        console.log('Sending request', data)
        return new Promise((resolve, reject) => {
            const correlationId = nanoid()
            const replyTimeout = setTimeout(() => {
                correlationMap.delete(correlationId)
                reject(new Error('Request timeout'))
            }, 10000)
            correlationMap.set(correlationId, (replyData) => {
                correlationMap.delete(correlationId)
                clearTimeout(replyTimeout)
                resolve(replyData)
            })
            channel.send({
                type: 'request',
                data,
                id: correlationId
            })
        })
    }

    channel.on('message', message => { // (3)
        const callback = correlationMap.get(message.inReplyTo)
        if (callback) {
            callback(message.data)
        }
    })
    return sendRequest
}

这就是我们的请求抽象的工作原理:

  1. createRequestChannel() 是一个工厂,它包装输入通道并返回用于发送请求和接收回复的 sendRequest() 函数。 该模式的神奇之处在于correlationMap 变量,它存储传出请求与其回复处理程序之间的关联。

  2. sendRequest()函数用于发送新请求。 它的工作是使用 nanoid 包 (nodejsdp.link/nanoid) 生成相关 ID,然后将请求数据包装在一个信封中,该信封允许我们指定相关 ID 和消息类型。 然后,将相关 ID 和负责将回复数据返回给调用者的处理程序(在后台使用resolve())添加到 correlationMap 中,以便稍后可以使用相关 ID 检索处理程序。 我们还实现了一个非常简单的请求超时逻辑。

  3. 当工厂被调用时,我们也开始监听传入的消息。 如果消息的相关ID(包含在inReplyTo属性中)与correlationMap映射中包含的任何ID匹配,我们就知道我们刚刚收到了回复,因此我们获得了对关联响应处理程序的引用,并使用 消息中包含的数据。

这就是 createRequestChannel.js 模块的内容。 让我们继续下一部分。

抽象回复

我们距离实现完整模式仅一步之遥,所以让我们看看请求通道的对应部分(即回复通道)是如何工作的。 让我们创建另一个名为 createReplyChannel.js 的文件,其中将包含用于包装回复处理程序的抽象:

export function createReplyChannel(channel) {
    return function registerHandler(handler) {
        channel.on('message', async message => {
            if (message.type !== 'request') {
                return
            }
            const replyData = await handler(message.data) // (1)
            channel.send({ // (2)
                type: 'response',
                data: replyData,
                inReplyTo: message.id
            })
        })
    }
}

我们的 createReplyChannel() 函数又是一个工厂,它返回另一个用于注册新回复处理程序的函数。 这是注册新处理程序时发生的情况:

  1. 当我们收到新请求时,我们立即通过传递消息中包含的数据来调用处理程序。

  2. 一旦处理程序完成其工作并返回其回复,我们将围绕数据构建一个信封,并包含消息类型和请求的相关 ID(inReplyTo 属性),然后将所有内容放回通道中。

这种模式的神奇之处在于,在 Node.js 中它变得非常容易:对我们来说一切都已经是异步的,因此构建在单向通道之上的异步请求/回复通信与任何其他异步操作没有太大区别, 特别是如果我们构建一个抽象来隐藏其实现细节。

尝试完整的请求/回复周期

现在我们准备尝试新的异步请求/回复抽象。 让我们在名为 replier.js 的文件中创建一个示例回复器:

import {createReplyChannel} from './createReplyChannel.js'

const registerReplyHandler = createReplyChannel(process)
registerReplyHandler(req => {
    return new Promise(resolve => {
        setTimeout(() => {
            resolve({sum: req.a + req.b})
        }, req.delay)
    })
})
process.send('ready')

我们的回复器只是计算请求中收到的两个数字之间的总和,并在一定延迟(也在请求中指定)后返回结果。 这将使我们能够验证响应的顺序是否与发送请求的顺序不同,以确认我们的模式正在工作。 使用模块的最后一条指令,我们向父进程发送一条消息,表明子进程已准备好接受请求。

完成该示例的最后一步是在名为 requestor.js 的文件中创建请求者,该文件还具有使用 child_ process.fork() 启动回复者的任务:

import {fork} from 'child_process'
import {dirname, join} from 'path'
import {fileURLToPath} from 'url'
import {once} from 'events'
import {createRequestChannel} from './createRequestChannel.js'

const __dirname = dirname(fileURLToPath(import.meta.url))

async function main() {
    const channel = fork(join(__dirname, 'replier.js')) // (1)
    const request = createRequestChannel(channel)
    try {
        const [message] = await once(channel, 'message') // (2)
        console.log(`Child process initialized: ${message}`)
        const p1 = request({a: 1, b: 2, delay: 500}) // (3)
            .then(res => {
                console.log(`Reply: 1 + 2 = ${res.sum}`)
            })
        const p2 = request({a: 6, b: 1, delay: 100}) // (4)
            .then(res => {
                console.log(`Reply: 6 + 1 = ${res.sum}`)
            })
        await Promise.all([p1, p2]) // (5)
    } finally {
        channel.disconnect() // (6)
    }
}

main().catch(err => console.error(err))

请求者启动回复者 (1),然后将其引用传递给我们的 createRequestChannel() 抽象。 然后,我们等待子进程可用 (2) 并运行几个示例请求 (3、4)。 最后,我们等待两个请求完成 (5),然后断开通道 (6) 以允许子进程(以及父进程)正常退出。

要试用该示例,只需启动 requestor.js 模块即可。 输出应该类似于以下内容:

Child process initialized: ready
Sending request { a: 1, b: 2, delay: 500 }
Sending request { a: 6, b: 1, delay: 100 }
Reply: 6 + 1 = 7
Reply: 1 + 2 = 3

这证实了我们的请求/回复消息传递模式的实现完美地工作,并且回复与其各自的请求正确关联,无论它们以什么顺序发送或接收。

当我们拥有单个点对点通道时,我们在本节中讨论的技术非常有效。 但是,如果我们有一个具有多个通道或队列的更复杂的架构,会发生什么? 这就是我们接下来要看到的。

返回地址

相关标识符是在单向通道之上创建请求/回复通信的基本模式。 然而,当我们的消息传递架构有多个通道或队列,或者可能有多个请求者时,这还不够。 在这些情况下,除了相关 ID 之外,我们还需要知道返回地址,这是一条允许回复者将响应发送回请求的原始发送者的信息。

在 AMQP 中实现返回地址模式

在基于 AMQP 的架构的上下文中,返回地址是请求者侦听传入回复的队列。 由于响应只能由一个请求者接收,因此队列是私有的并且不能在不同的消费者之间共享,这一点很重要。 从这些属性中,我们可以推断我们将需要一个作用域为请求者连接的瞬态队列,并且回复者必须与返回队列建立点对点通信才能传递其响应。

下图为我们提供了此场景的示例:

image 2024 05 08 16 10 51 049
Figure 2. 图 13.22:使用 AMQP 的请求/回复消息传递架构

图 13.22 向我们展示了每个请求者如何拥有自己的私有队列,专门用于处理对其请求的回复。 所有请求都被发送到单个队列,然后由回复者使用。 由于请求中指定的返回地址信息,回复者会将回复路由到正确的响应队列。

事实上,要在 AMQP 之上创建请求/回复模式,我们所需要做的就是在消息属性中指定响应队列的名称,以便回复者知道响应消息必须传递到哪里。

这个理论看起来非常简单,所以让我们看看如何在实际应用中实现它。

实现抽象请求

现在让我们在 AMQP 之上构建一个请求/回复抽象。 我们将使用 RabbitMQ 作为代理,但任何兼容的 AMQP 代理都可以完成这项工作。 让我们从 amqpRequest.js 模块中实现的请求抽象开始。 我们将在这里一次展示一段代码,以便于解释。 让我们从AMQPRequest类的构造函数开始:

export class AMQPRequest {
    constructor () {
        this.correlationMap = new Map()
    }
//...

从前面的代码中可以看出,我们将再次使用相关标识符模式,因此我们需要一个映射来保存消息 ID 和相关处理程序之间的关联。

然后,我们需要一个方法来初始化 AMQP 连接及其对象:

async initialize () {
    this.connection = await amqp.connect('amqp://localhost')
    this.channel = await this.connection.createChannel()
    const { queue } = await this.channel.assertQueue('', // (1)
        { exclusive: true })
    this.replyQueue = queue
    this.channel.consume(this.replyQueue, msg => { // (2)
        const correlationId = msg.properties.correlationId
        const handler = this.correlationMap.get(correlationId)
        if (handler) {
            handler(JSON.parse(msg.content.toString()))
        }
    }, { noAck: true })
}

这里需要观察的有趣的事情是我们如何创建队列来保存回复 (1)。 特殊之处在于我们没有指定任何名称,这意味着将为我们选择一个随机名称。 除此之外,队列是独占的,这意味着它绑定到当前活动的 AMQP 连接,并且当连接关闭时它将被销毁。 不需要将队列绑定到交换器,因为我们不需要任何路由或分发到多个队列,这意味着消息必须直接传递到我们的响应队列中。 在函数(2)的第二部分中,我们开始消费来自replyQueue的消息。 在这里,我们将传入消息的 ID 与关联映射中的 ID 进行匹配,并调用关联的处理程序。

接下来,让我们看看如何发送新请求:

send (queue, message) {
    return new Promise((resolve, reject) => {
        const id = nanoid() // (1)
        const replyTimeout = setTimeout(() => {
            this.correlationMap.delete(id)
            reject(new Error('Request timeout'))
        }, 10000)
        this.correlationMap.set(id, (replyData) => { // (2)
            this.correlationMap.delete(id)
            clearTimeout(replyTimeout)
            resolve(replyData)
        })
        this.channel.sendToQueue(queue, // (3)
            Buffer.from(JSON.stringify(message)),
            { correlationId: id, replyTo: this.replyQueue }
        )
    })
}

send() 方法接受请求队列的名称和要发送的消息作为输入。 正如我们在上一节中了解到的,我们需要生成一个相关 ID (1) 并将其关联到负责将回复返回给调用者的处理程序 (2)。 最后,我们发送消息 (3),将correlationId 和replyTo 属性指定为元数据。 事实上,在 AMQP 中,我们可以指定一组属性(或元数据)与主消息一起传递给消费者。 元数据对象作为 sendToQueue() 方法的第三个参数传递。

需要注意的是,我们使用channel.sentToQueue() API而不是channel.publish()来发送消息。 这是因为我们对使用交换实现发布/订阅分发模式不感兴趣,而是对直接到目标队列的更基本的点对点传递感兴趣。

AMQPRequest 类的最后一部分是我们实现 destroy() 方法的地方,该方法用于关闭连接和通道:

    destroy () {
        this.channel.close()
        this.connection.close()
    }
}

amqpRequest.js 模块就这样了。

实现抽象回复

现在是时候在名为 amqpReply.js 的新模块中实现回复抽象了:

import amqp from 'amqplib'

export class AMQPReply {
    constructor(requestsQueueName) {
        this.requestsQueueName = requestsQueueName
    }

    async initialize() {
        const connection = await amqp.connect('amqp://localhost')
        this.channel = await connection.createChannel()
        const {queue} = await this.channel.assertQueue( // (1)
            this.requestsQueueName)
        this.queue = queue
    }

    handleRequests(handler) { // (2)
        this.channel.consume(this.queue, async msg => {
            const content = JSON.parse(msg.content.toString())
            const replyData = await handler(content)
            this.channel.sendToQueue( // (3)
                msg.properties.replyTo,
                Buffer.from(JSON.stringify(replyData)),
                {correlationId: msg.properties.correlationId}
            )
            this.channel.ack(msg)
        })
    }
}

在AMQPReply 类的initialize() 方法中,我们创建将接收传入请求的队列(1):我们可以使用简单的持久队列来实现此目的。 handleRequests() 方法 (2) 用于注册新的请求处理程序,从该处理程序可以发送新的回复。 当发回回复(3)时,我们使用channel.sendToQueue()将消息直接发布到消息的replyTo属性(我们的返回地址)中指定的队列中。 我们还在回复中设置了correlationId,以便接收者可以将消息与待处理请求列表进行匹配。

实现请求者和回复者

现在一切都准备好了,可以尝试我们的系统,但首先,让我们构建一对示例请求者和回复者,看看如何使用我们的新抽象。

让我们从replier.js模块开始:

import {AMQPReply} from './amqpReply.js'

async function main() {
    const reply = new AMQPReply('requests_queue')
    await reply.initialize()
    reply.handleRequests(req => {
        console.log('Request received', req)
        return {sum: req.a + req.b}
    })
}

main().catch(err => console.error(err))

很高兴看到我们构建的抽象如何允许我们隐藏处理相关 ID 和返回地址的所有机制。 我们需要做的就是初始化一个新的回复对象,指定我们想要接收请求的队列的名称('requests_queue')。 其余的代码很简单; 实际上,我们的示例回复器只是计算作为输入接收的两个数字的总和,并将结果发送回对象中。

另一方面,我们在 requestor.js 文件中实现了一个示例请求器:

import {AMQPRequest} from './amqpRequest.js'
import delay from 'delay'

async function main() {
    const request = new AMQPRequest()
    await request.initialize()

    async function sendRandomRequest() {
        const a = Math.round(Math.random() * 100)
        const b = Math.round(Math.random() * 100)
        const reply = await request.send('requests_queue', {a, b})
        console.log(`${a} + ${b} = ${reply.sum}`)
    }

    for (let i = 0; i < 20; i++) {
        await sendRandomRequest()
        await delay(1000)
    }
    request.destroy()
}

main().catch(err => console.error(err))

我们的示例请求者以一秒的间隔向 requests_queue 队列发送 20 个随机请求。 在这种情况下,有趣的是我们的抽象完美地完成了它的工作,隐藏了异步请求/回复模式实现背后的所有细节。

现在,要试用该系统,只需运行回复器模块,然后运行几个请求者实例:

node replier.js
node requestor.js
node requestor.js

您将看到请求者发布的一组操作,然后回复者接收这些操作,然后回复者将响应发送回正确的请求者。

现在我们可以尝试其他实验。 一旦应答器第一次启动,它就会创建一个持久队列,这意味着如果我们现在停止它然后再次运行应答器,则不会丢失任何请求。 所有消息将被存储在队列中,直到回复器再次启动!

请注意,根据我们实现应用程序的方式,请求将在 10 秒后超时。 因此,为了使回复及时到达请求者,回复者只能承受有限的停机时间(当然少于 10 秒)。

我们通过使用 AMQP 免费获得的另一个很好的功能是我们的应答器是开箱即用的可扩展性。 为了测试这个假设,我们可以尝试启动两个或多个回复器实例,并观察它们之间的负载平衡请求。 这是有效的,因为每次请求者启动时,它都会将自己作为侦听器附加到同一个持久队列,因此,代理将在队列的所有消费者之间对消息进行负载平衡(还记得竞争消费者模式吗?)。 甜的!

ZeroMQ 有一对专门用于实现请求/回复模式的套接字,称为 REQ/REP,但是它们是同步的(一次只有一个请求/响应)。 使用更复杂的技术可以实现更复杂的请求/回复模式。 有关更多信息,您可以阅读官方指南:nodejsdp.link/zeromq-reqrep。

带有返回地址的请求/回复模式也可以在 Redis Streams 之上实现,并且与我们使用 AMQP 实现的系统非常相似。 我们将把它留给您作为练习来实施。