任务分配模式

在第 11 章 “高级配方” 中,我们学习了如何将代价高昂的任务分配给多个本地进程。因此,在本节中,我们将了解如何在分布式架构中使用类似的模式,使用位于网络中任何位置的远程工作者。

我们的想法是采用一种消息传递模式,让我们可以在多台机器上执行任务。这些任务可能是单独的工作块,也可能是采用分而治之的方法分割的更大任务的一部分。

如果我们查看下图中表示的逻辑架构,我们应该能够识别出熟悉的模式:

image 2024 05 08 14 23 36 952
Figure 1. 图 13.15:将任务分配给一组消费者

从图13.15的图中我们可以看到,发布/订阅模式不适合这种类型的应用程序,因为我们绝对不希望一个任务被多个worker接收。 相反,我们需要的是一种类似于负载均衡器的消息分发模式,它将每条消息分派给不同的消费者(在本例中也称为工作线程)。 在消息传递系统术语中,这种模式也称为竞争消费者、扇出分布或通风机。

与我们在上一章中看到的 HTTP 负载均衡器的一个重要区别是,在这里,消费者扮演着更积极的角色。 事实上,正如我们稍后将看到的,大多数时候不是生产者连接到消费者,而是消费者本身连接到任务生产者或任务队列以接收新作业。 这在可扩展系统中是一个很大的优势,因为它允许我们无缝地增加工作人员的数量,而无需修改生产者或采用服务注册表。

此外,在通用消息系统中,我们不一定在生产者和工作人员之间进行请求/回复通信。 相反,大多数时候,首选方法是使用单向异步通信,这样可以实现更好的并行性和可扩展性。 在这样的架构中,消息可能始终朝一个方向传播,从而创建管道,如下图所示:

image 2024 05 08 14 24 41 264
Figure 2. 图 13.16:消息传递管道

管道允许我们构建非常复杂的处理架构,而无需同步请求/回复通信的开销,通常会导致更低的延迟和更高的吞吐量。 在图13.16中,我们可以看到消息如何分布在一组工作人员(扇出)中,转发到其他处理单元,然后聚合到单个节点(扇入)中,通常称为接收器。

在本节中,我们将通过分析两个最重要的变体:点对点和基于代理来重点关注此类架构的构建块。

管道与任务分配模式的组合也称为并行管道。

ZeroMQ 扇出/扇入模式

我们已经发现了 ZeroMQ 用于构建点对点分布式架构的一些功能。 事实上,在上一节中,我们使用 PUB 和 SUB 套接字将单个消息传播给多个消费者,现在,我们将了解如何使用另一对称为 PUSH 和 PULL 的套接字构建并行管道。

推/拉 sockets

直观上,我们可以说 PUSH 套接字用于发送消息,而 PULL 套接字用于接收。 这似乎是一个微不足道的组合,但是,它们具有一些额外的功能,使它们非常适合构建单向通信系统:

  • 两者都可以在连接模式或绑定模式下工作。 换句话说,我们可以创建一个 PUSH 套接字并将其绑定到本地端口,侦听来自 PULL 套接字的传入连接,反之亦然,PULL 套接字可能侦听来自 PUSH 套接字的连接。 消息总是以相同的方向传播,从 PUSH 到 PULL,只是连接的发起者可以不同。 绑定模式是持久节点(例如任务生产者和接收器)的最佳解决方案,而连接模式则非常适合临时节点(例如任务工作人员)。 这允许瞬态节点的数量任意变化,而不影响更稳定、更持久的节点。

  • 如果有多个PULL 套接字连接到单个PUSH 套接字,则消息将均匀分布在所有PULL 套接字上。 实际上,它们是负载平衡的(点对点负载平衡!)。 另一方面,从多个 PUSH 套接字接收消息的 PULL 套接字将使用公平排队系统处理消息,这意味着它们会从所有源均匀地被消耗 - 应用于入站消息的循环。

  • 通过没有任何连接的PULL 套接字的PUSH 套接字发送的消息不会丢失。 相反,它们会排队,直到节点上线并开始拉取消息。

我们现在开始了解 ZeroMQ 与传统 Web 服务有何不同,以及为什么它是构建分布式消息系统的完美工具。

使用 ZeroMQ 构建分布式哈希破解器

现在是时候构建一个示例应用程序来查看我们刚刚描述的 PUSH/PULL 套接字的属性了。

哈希和破解程序是一个简单而有趣的应用程序:一种使用暴力方法尝试将给定哈希和(例如 MD5 或 SHA1)与给定字母表字符的每个可能变体的哈希和相匹配的系统 ,从而发现创建给定哈希和的原始字符串。

这是一个令人尴尬的并行工作负载 (nodejsdp.link/embarrassinglyparallel),非常适合构建演示并行管道的强大功能的示例。

切勿使用纯哈希和来加密密码,因为它们很容易被破解。 请改用专用算法,例如 bcrypt (nodejsdp.link/bcrypt)、scrypt (nodejsdp.link/scrypt)、PBKDF2 (nodejsdp.link/pbkdf2) 或 Argon2 (nodejsdp.link/ argon2)。

对于我们的应用程序,我们希望实现一个典型的并行管道,其中具有以下功能:

  • 一个节点,用于在多个工作人员之间创建和分发任务

  • 多个工作人员节点(实际计算发生的位置)

  • 一个用于收集所有结果的节点

我们刚刚描述的系统可以使用以下架构在 ZeroMQ 中实现:

image 2024 05 08 14 28 20 579
Figure 3. 图 13.17:ZeroMQ 的典型管道架构

在我们的架构中,我们有一个通风机生成给定字母表中字符变体的间隔(例如,间隔 “aa” 到 “bb” 包括变体 “aa”、“ab”、“ba”、“bb”) 并将这些间隔作为任务分配给工人。 然后,每个工作人员计算给定间隔内每个变化的哈希和,尝试将每个结果哈希和与作为输入给出的控制哈希和进行匹配。 如果找到匹配,则将结果发送到结果收集器节点(接收器)。

我们架构中的持久节点是通风机和接收器,而临时节点是工作节点。 这意味着每个 worker 将其 PULL 套接字连接到通风机,并将其 PUSH 套接字连接到接收器,这样我们就可以启动和停止任意数量的工作器,而无需更改通风机或接收器中的任何参数。

实现生产者

为了表示变化的区间,我们将使用索引的 n 叉树。 如果我们想象一棵树,其中每个节点恰好有 n 个子节点,其中每个子节点都是给定字母表的 n 个元素之一,并且我们以广度优先顺序为每个节点分配一个索引,那么,给定字母表 [a, b] 我们应该获得一棵如下所示的树:

image 2024 05 08 14 53 34 101
Figure 4. 图 13.18:字母表 [a, b] 的索引 n 叉树

然后,通过从根到给定索引遍历树,并将沿途找到的节点元素附加到正在计算的变化中,可以获得与索引相对应的变化。 例如,给定图 13.18 中的树,对应于索引 13 的变体将为“bba”。

我们将利用 indexed-string-variation 包 (nodejsdp.link/indexedstring-variation) 来帮助我们根据 n 叉树中的索引计算相应的变体。 这个操作是在worker中完成的,所以我们在ventilator中要做的就是产生索引的区间并提供给worker,worker反过来会计算这些区间所代表的字符的所有变体。

现在,在了解了必要的理论之后,让我们开始通过实现负责生成要分发的任务的组件(generateTasks.js)来构建我们的系统:

export function* generateTasks(searchHash, alphabet,
                               maxWordLength, batchSize) {
    let nVariations = 0
    for (let n = 1; n <= maxWordLength; n++) {
        nVariations += Math.pow(alphabet.length, n)
    }
    console.log('Finding the hashsum source string over ' +
        `${nVariations} possible variations`)
    let batchStart = 1
    while (batchStart <= nVariations) {
        const batchEnd = Math.min(
            batchStart + batchSize - 1, nVariations)
        yield {
            searchHash,
            alphabet: alphabet,
            batchStart,
            batchEnd
        }
        batchStart = batchEnd + 1
    }
}

generateTasks()生成器创建batchSize大小的整数间隔,从1开始(我们排除0,它是树的根,对应于空变体),并以给定字母表的最大可能索引(nVariations)结束 提供的最大字长 (maxLength)。 然后,我们将有关任务的所有数据打包到一个对象中并将其交给调用者。

请考虑,要生成更长的字符串,可能需要切换到 BigInt (nodejsdp.link/bigint) 来表示其索引,因为 JavaScript 目前可管理的最大安全整数为 253 – 1,即 Number.MAX_SAFE_INTEGER 的值。 请注意,使用非常大的整数可能会对变化生成器的性能产生负面影响。

现在,我们需要实现生产者的逻辑,它负责将任务分配给所有工作人员(在 Producer.js 文件中):

import zmq from 'zeromq'
import delay from 'delay'
import {generateTasks} from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv

async function main() {
    const ventilator = new zmq.Push() // (1)
    await ventilator.bind('tcp://*:5016')
    await delay(1000) // wait for all the workers to connect
    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await ventilator.send(JSON.stringify(task)) // (2)
    }
}

main().catch(err => console.error(err))

为了避免生成太多变体,我们的生成器仅使用英文字母表中的小写字母,并对生成的单词大小设置限制。 此限制作为命令行参数 (maxLength) 中的输入以及要匹配的哈希和 (searchHash) 提供。

但我们最感兴趣分析的部分是如何在工作人员之间分配任务:

  1. 我们首先创建一个 PUSH 套接字,并将其绑定到本地端口 5016,这是工作人员的 PULL 套接字将连接以接收其任务的地方。 然后,我们等待 1 秒让所有工作人员连接:我们这样做是因为如果生产者在工作人员已经运行时启动,工作人员可能会在不同时间连接(因为它们基于计时器的重新连接算法),这可能会导致第一个 连接worker来接收大部分任务。

  2. 对于每个生成的任务,我们将其字符串化并使用通风机套接字的 send() 函数将其发送给工作人员。 每个连接的工作人员将按照循环方法接收不同的任务。

实现 worker

现在是时候实现worker了,但首先,让我们创建一个组件来处理 传入任务(在 processTask.js 文件中):

import isv from 'indexed-string-variation'
import {createHash} from 'crypto'

export function processTask(task) {
    const variationGen = isv.generator(task.alphabet)
    console.log('Processing from ' +
        `${variationGen(task.batchStart)} (${task.batchStart}) ` +
        `to ${variationGen(task.batchEnd)} (${task.batchEnd})`)
    for (let idx = task.batchStart; idx <= task.batchEnd; idx++) {
        const word = variationGen(idx)
        const shasum = createHash('sha1')
        shasum.update(word)
        const digest = shasum.digest('hex')
        if (digest === task.searchHash) {
            return word
        }
    }
}

processTask() 函数的逻辑非常简单:它迭代给定间隔内的索引,然后为每个索引生成相应的字符(单词)变体。 接下来,它计算该单词的 SHA1 校验和,并尝试将其与任务对象中传递的 searchHash 进行匹配。 如果两个摘要匹配,则它将源单词返回给调用者。

现在我们准备好实现我们的worker (worker.js)的主要逻辑:

import zmq from 'zeromq'
import {processTask} from './processTask.js'

async function main() {
    const fromVentilator = new zmq.Pull()
    const toSink = new zmq.Push()
    fromVentilator.connect('tcp://localhost:5016')
    toSink.connect('tcp://localhost:5017')
    for await (const rawMessage of fromVentilator) {
        const found = processTask(JSON.parse(rawMessage.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await toSink.send(`Found: ${found}`)
        }
    }
}

main().catch(err => console.error(err))

正如我们所说,我们的工作线程代表我们架构中的一个临时节点,因此,它的套接字应该连接到远程节点,而不是监听传入的连接。 这正是我们在工作线程中所做的,我们创建了两个套接字:

  • 连接到通风机的 PULL 套接字,用于接收任务

  • 连接到接收器的 PUSH 套接字,用于传播结果

除此之外,我们完成的工作 Worker 非常简单:它处理收到的每个任务,如果找到匹配项,我们通过 toSink 套接字向结果收集器发送一条消息。

实现结果收集器

对于我们的示例,结果收集器(接收器)是一个非常基本的程序,它只是将工作人员收到的消息打印到控制台。 Collector.js文件内容如下:

import zmq from 'zeromq'

async function main() {
    const sink = new zmq.Pull()
    await sink.bind('tcp://*:5017')
    for await (const rawMessage of sink) {
        console.log('Message from worker: ', rawMessage.toString())
    }
}

main().catch(err => console.error(err))

有趣的是,结果收集器(作为生产者)也是我们架构的持久节点,因此我们绑定它的 PULL 套接字,而不是将其显式连接到工作线程的 PUSH 套接字。

运行应用程序

我们现在准备启动我们的应用程序; 让我们启动几个工作人员和结果收集器(每个工作人员都在不同的终端中):

node worker.js
node worker.js
node collector.js

然后是时候启动生成器,指定要生成的单词的最大长度以及我们想要匹配的 SHA1 校验和。 以下是示例命令行:

node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

当上面的命令运行时,生产者将开始生成任务并将它们分发给我们启动的工作人员集。 我们告诉生产者生成所有可能的包含 4 个小写字母的单词(因为我们的字母表仅包含小写字母),并且我们还提供了一个与秘密 4 个字母单词相对应的 SHA1 校验和示例。

计算结果(如果有)将出现在结果收集器应用程序的终端中。

请注意,考虑到 ZeroMQ 中 PUSH/PULL 套接字的低级性质,特别是缺乏消息确认,如果节点崩溃,那么它正在处理的所有任务都将丢失。 可以在 ZeroMQ 之上实现自定义确认机制,但我们将其作为读者的练习。

此实现的另一个已知限制是,如果找到匹配项,工作人员将不会停止处理任务。 故意省略此功能是为了使示例尽可能集中于正在讨论的模式。 您可以尝试添加此“停止”机制作为练习。

AMQP 中的管道和竞争消费者

在上一节中,我们了解了如何在对等环境中实现并行管道。 现在,我们将探讨使用 RabbitMQ 将这种模式应用于基于代理的架构中时的情况。

点对点通信和竞争消费者

在对等配置中,管道是一个非常容易想象的概念。 然而,有了中间的消息代理,系统的各个节点之间的关系就有点难以理解了:代理本身充当我们通信的中介,而且通常我们并不真正知道谁在使用。 对方正在监听消息。 例如,当我们使用 AMQP 发送消息时,我们不会将其直接传递到目的地,而是传递到交换器,然后传递到队列。 最后,代理将根据交换、绑定和目标队列中定义的规则来决定将消息路由到何处。

如果我们想使用像 AMQP 这样的系统实现管道和任务分发模式,我们必须确保每条消息仅被一个消费者接收,但这无法保证一个交换是否可能绑定到多个消费者 队列。 那么,解决方案就是直接将消息发送到目标队列,完全绕过交换。 这样,我们就可以确保只有一个队列会收到该消息。 这种通信模式称为点对点。

一旦我们能够将一组消息直接发送到单个队列,我们就已经实现了任务分配模式的一半。 事实上,下一步是自然而然的:当多个消费者正在侦听同一个队列时,消息将遵循扇出分布模式均匀地分布在它们之间。 正如我们已经提到的,在消息代理的上下文中,这被称为竞争消费者模式。

接下来,我们将使用 AMQP 重新实现简单的哈希和破解程序,以便我们可以理解与上一节中讨论的点对点方法的差异。

使用 AMQP 实现哈希和破解器

我们刚刚了解到,交换器是代理中将消息多播到一组消费者的点,而队列是消息进行负载平衡的地方。 记住这些知识,现在让我们在 AMQP 代理(在我们的例子中是 RabbitMQ)之上实现我们的暴力哈希破解器。 下图为您展示了我们要实现的系统的概述:

image 2024 05 08 15 45 43 439
Figure 5. 图 13.19:使用消息队列代理的任务分发架构

正如我们所讨论的,要将一组任务分配给多个工作人员,我们需要使用单个队列。 在图 13.19 中,我们将其称为任务队列。 在任务队列的另一侧,我们有一组工作人员,它们是竞争的消费者:换句话说,每个工作人员都会从队列中接收不同的消息。 其效果是多个任务将在不同的工作线程上并行执行。

Worker产生的结果被发布到另一个队列中,我们称之为结果队列,然后被结果收集器消费,这实际上相当于一个接收器。 在整个架构中,我们没有使用任何交换器,我们只是将消息直接发送到目的地队列,实现点对点类型的通信。

实现生产者

让我们看看如何实现这样一个系统,从生产者开始(在 Producer.js 文件中):

import amqp from 'amqplib'
import {generateTasks} from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const [, , maxLength, searchHash] = process.argv

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createConfirmChannel() // (1)
    await channel.assertQueue('tasks_queue')
    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        channel.sendToQueue('tasks_queue', // (2)
            Buffer.from(JSON.stringify(task)))
    }
    await channel.waitForConfirms()
    channel.close()
    connection.close()
}

main().catch(err => console.error(err))

正如我们所看到的,没有任何交换或绑定使得基于 AMQP 的应用程序的设置变得更加简单。 不过,有一些细节需要注意:

  1. 我们不是创建标准通道,而是创建一个确认通道。 这是必要的,因为它创建了一个具有一些额外功能的通道,特别是,它提供了 waitForConfirms() 函数,我们稍后在代码中使用该函数来等待代理确认接收到所有消息。 这对于防止应用程序在从本地队列分派所有消息之前过早关闭与代理的连接是必要的。

  2. 生产者的核心是 channel.sendToQueue() API,这对我们来说实际上是新的。 正如其名称所示,该 API 负责将消息直接传递到队列(在我们的示例中为tasks_queue),绕过任何交换或路由。

实现 worker

在任务队列的另一侧,我们让工作人员监听传入的任务。 让我们更新现有的 worker.js 模块的代码以使用 AMQP:

import amqp from 'amqplib'
import {processTask} from './processTask.js'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const {queue} = await channel.assertQueue('tasks_queue')
    channel.consume(queue, async (rawMessage) => {
        const found = processTask(
            JSON.parse(rawMessage.content.toString()))
        if (found) {
            console.log(`Found! => ${found}`)
            await channel.sendToQueue('results_queue',
                Buffer.from(`Found: ${found}`))
        }
        await channel.ack(rawMessage)
    })
}

main().catch(err => console.error(err))

我们的新工作人员也与我们在上一节中使用 ZeroMQ 实现的工作人员非常相似,除了与消息交换相关的部分。 在前面的代码中,我们可以看到如何首先获取对名为tasks_queue的队列的引用,然后开始使用channel侦听传入的任务。 消耗()。 然后,每次找到匹配项时,我们都会通过 results_queue 将结果发送到收集器,再次使用点对点通信。 同样重要的是要注意在消息被完全处理后我们如何使用 channel.ack() 确认每条消息。

如果启动多个工作线程,它们都将监听同一个队列,从而导致消息在它们之间进行负载平衡(它们成为竞争消费者)。

实现结构收集器

结果收集器也是一个简单的模块,只是将收到的任何消息打印到控制台。 这是在 Collector.js 文件中实现的,如下所示:

import amqp from 'amqplib'

async function main() {
    const connection = await amqp.connect('amqp://localhost')
    const channel = await connection.createChannel()
    const {queue} = await channel.assertQueue('results_queue')
    channel.consume(queue, msg => {
        console.log(`Message from worker: ${msg.content.toString()}`)
    })
}

main().catch(err => console.error(err))

运行应用程序

现在一切准备就绪,可以尝试我们的新系统了。 首先,确保 RabbitMQ 服务器正在运行,然后您可以启动几个工作程序(在两个单独的终端中),它们都将连接到同一个队列(tasks_queue),以便每条消息都将在它们之间实现负载平衡:

node worker.js
node worker.js

然后,您可以运行收集器模块,然后运行生产者(通过提供最大字长和要破解的哈希值):

node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

至此,我们使用 AMQP 实现了消息管道和竞争消费者模式。

有趣的是,我们基于 AMQP 的哈希和破解程序的新版本需要稍长的时间(与基于 ZeroMQ 的版本相比)来执行所有任务并找到匹配项。 这是一个实际演示,说明与更底层的点对点方法相比,代理如何实际引入负面性能影响。 然而,我们不要忘记,与 ZeroMQ 实现相比,使用 AMQP 可以开箱即用得多。 例如,通过 AMQP 实现,如果一个工作线程崩溃,它正在处理的消息不会丢失,并且最终会传递给另一个工作线程。 因此,请记住,在为您的应用程序选择正确的方法时,始终要着眼于大局:与系统整体复杂性的大幅增加或某些重要功能的缺乏相比,微小的延迟可能毫无意义。

现在,让我们考虑另一种基于代理的方法来实现任务分配模式,这次是基于 Redis Streams 构建的。

使用 Redis Streams 分发任务

在了解如何使用 ZeroMQ 和 AMQP 实现任务分配模式之后,我们现在将了解如何利用 Redis Streams 实现此模式。

Redis 消费者组

在深入研究一些代码之前,我们需要了解 Redis 的一个关键功能,该功能允许我们使用 Redis Streams 实现任务分配模式。 此功能称为消费者组,是 Redis Streams 之上的竞争消费者模式(添加了一些有用的附件)的实现。

消费者组是一个有状态的实体,由名称标识,由一组由名称标识的消费者组成。 当组中的消费者尝试读取流时,他们将以循环配置接收记录。

每条记录都必须明确确认,否则记录将保持挂起状态。 每个消费者只能访问自己的待处理记录历史记录,除非它明确声明另一个消费者的记录。 如果消费者在处理记录时崩溃,这非常有用。 当消费者重新上线时,它应该做的第一件事是检索待处理记录列表并在从流中请求新记录之前处理这些记录。 图 13.20 直观地展示了消费者组在 Redis 中的工作方式。

image 2024 05 08 15 53 09 363
Figure 6. 图 13.20:Redis Stream 消费者组

我们可以注意到,当组中的两个消费者尝试从流中读取数据时,他们如何接收两个不同的记录(B 代表消费者 1,C 代表消费者 2)。 消费者组还存储最后检索的记录(记录C)的ID,以便在连续的读取操作中消费者组知道下一条要读取的记录是什么。 我们还可以注意到消费者 1 如何有一个待处理记录 (A),这是一条仍在处理或无法处理的记录。 消费者 1 可以实现重试算法,以确保处理分配给自己的所有待处理记录。

一个 Redis Stream 可以有多个消费者组。 这样就可以同时对同一数据应用不同类型的处理。

现在让我们将刚刚学到的有关 Redis 消费者组的知识付诸实践,以实现我们的哈希和破解程序。

使用 Redis Streams 实现哈希和破解器

我们使用 Redis Streams 的哈希和破解程序的架构将与之前的 AMQP 示例非常相似。 事实上,我们将有两个不同的流(在 AMQP 示例中它们是队列):一个流用于保存要处理的任务 (tasks_stream),另一个流用于保存来自工作人员的结果 (results_stream)。

然后,我们将使用消费者组将任务从 tasks_ 流分发给我们应用程序的工作人员(我们的工作人员是消费者)。

实现生产者

让我们从实现生产者开始(在 Producer.js 文件中):

import Redis from 'ioredis'
import {generateTasks} from './generateTasks.js'

const ALPHABET = 'abcdefghijklmnopqrstuvwxyz'
const BATCH_SIZE = 10000
const redisClient = new Redis()
const [, , maxLength, searchHash] = process.argv

async function main() {
    const generatorObj = generateTasks(searchHash, ALPHABET,
        maxLength, BATCH_SIZE)
    for (const task of generatorObj) {
        await redisClient.xadd('tasks_stream', '*',
            'task', JSON.stringify(task))
    }
    redisClient.disconnect()
}

main().catch(err => console.error(err))

正如我们所看到的,新的 Producer.js 模块的实现对我们来说没有什么新鲜的。 事实上,我们已经非常清楚如何向流中添加记录; 我们所要做的就是调用 xadd(),如 “使用流进行可靠消息传递” 部分中所讨论的那样。

实现 worker

接下来,我们需要调整我们的工作线程,以便它可以使用消费者组与 Redis Stream 进行交互。 这是所有架构的核心,就像在这里,在工作人员中,我们利用消费者群体及其功能。 那么,让我们实现新的 worker.js 模块:

import Redis from 'ioredis'
import {processTask} from './processTask.js'

const redisClient = new Redis()
const [, , consumerName] = process.argv

async function main() {
    await redisClient.xgroup('CREATE', 'tasks_stream', // (1)
        'workers_group', '$', 'MKSTREAM')
        .catch(() => console.log('Consumer group already exists'))
    const [[, records]] = await redisClient.xreadgroup( // (2)
        'GROUP', 'workers_group', consumerName, 'STREAMS',
        'tasks_stream', '0')
    for (const [recordId, [, rawTask]] of records) {
        await processAndAck(recordId, rawTask)
    }
    while (true) {
        const [[, records]] = await redisClient.xreadgroup( // (3)
            'GROUP', 'workers_group', consumerName, 'BLOCK', '0',
            'COUNT', '1', 'STREAMS', 'tasks_stream', '>')
        for (const [recordId, [, rawTask]] of records) {
            await processAndAck(recordId, rawTask)
        }
    }
}

async function processAndAck(recordId, rawTask) { // (4)
    const found = processTask(JSON.parse(rawTask))
    if (found) {
        console.log(`Found! => ${found}`)
        await redisClient.xadd('results_stream', '*', 'result',
            `Found: ${found}`)
    }
    await redisClient.xack('tasks_stream', 'workers_group', recordId)
}

main().catch(err => console.error(err))

好吧,新的工作代码中有很多变化的部分。 那么,我们一步一步来分析:

  1. 首先我们需要确保消费者组存在,然后才能使用。 我们可以使用 xgroup 命令来做到这一点,我们使用以下参数调用该命令:

    1. 'CREATE' 是我们要创建消费者组时使用的关键字。 事实上,通过 xgroup 命令,我们还可以使用不同的子命令来销毁消费者组、删除消费者或更新最后读取的记录 ID。

    2. 'tasks_stream' 是我们要读取的流的名称。

    3. 'workers_group' 是消费者组的名称。

    4. 第四个参数表示消费者组应开始消费流中记录的记录 ID。 使用 '$'(美元符号)意味着消费者组应该从流中当前最后一条记录的 ID 开始读取流。

    5. 'MKSTREAM' 是一个额外参数,指示 Redis 创建流(如果尚不存在)。

  2. 接下来,我们读取属于当前消费者的所有待处理记录。 这些是消费者上次运行时由于应用程序突然中断(例如崩溃)而未处理的剩余记录。 如果同一消费者(具有相同名称)在上次运行期间正确终止,没有错误,则此列表很可能为空。 正如我们已经提到的,每个消费者只能访问自己的待处理记录。 我们使用 xreadgroup 命令和以下参数检索此列表:

    1. 'GROUP'、'workers_group'、consumerName 是必需的三重奏,我们在其中指定从命令行输入读取的消费者组的名称 ('workers_group') 和消费者的名称 (consumerName)。

    2. 然后我们用 “STREAMS”、“tasks_stream” 指定要读取的流。

    3. 最后,我们指定 “0” 作为最后一个参数,这是我们应该开始读取的 ID。 本质上,我们是说我们希望从第一条消息开始读取属于当前消费者的所有待处理消息。

  3. 然后,我们再次调用 xreadgroup(),但这次它具有完全不同的语义。 事实上,在这种情况下,我们希望开始从流中读取新记录(而不是访问消费者自己的历史记录)。 这可以通过以下参数列表来实现:

    1. 与之前调用 xreadgroup() 一样,我们使用三个参数指定要用于读取操作的消费者组:“GROUP”、“workers_group”、consumerName。

    2. 然后,我们指示如果当前没有可用的新记录,则调用应该阻塞,而不是返回空列表。 我们使用以下两个参数来做到这一点:“BLOCK”、“0”。 最后一个参数是超时,在此之后函数无论如何都会返回,即使没有结果。 “0” 意味着我们要无限期地等待。

    3. 接下来的两个参数 'COUNT' 和 '1' 告诉 Redis 我们有兴趣每次调用获取一条记录。

    4. 接下来,我们使用 “STREAMS”、“tasks_stream” 指定要读取的流。

    5. 最后,通过特殊 ID '>'(大于符号),我们表明我们对此消费者组尚未检索到的任何记录感兴趣。

  4. 最后,在 processAndAck() 函数中,我们检查是否有匹配项,如果是的话,我们将一条新记录附加到 results_stream 中。 最后,当 xreadgroup() 返回的记录的所有处理完成后,我们调用 Redis xack 命令来确认该记录已成功消费,这会导致该记录从当前消费者的待处理列表中删除。

呼!Worker.js 模块中发生了很多事情。有趣的是,大部分复杂性都来自于各种 Redis 命令所需的大量参数。

您可能会惊讶地发现这个示例只是触及了表面,因为关于 Redis Streams,特别是消费者群体,还有很多东西需要了解。 请查看官方 Redis 对 Streams 的介绍,了解更多详细信息:nodejsdp.link/redis-streams。

现在,一切都准备好了,我们可以尝试这个新版本的哈希和破解程序了。 让我们启动几个工作人员,但这次请记住为它们分配一个名称,该名称将用于在消费者组中识别它们:

node worker.js workerA
node worker.js workerB

然后,您可以像我们在前面的示例中所做的那样运行收集器和生成器:

node collector.js
node producer.js 4 f8e966d1e207d02c44511a58dccff2f5429e9a3b

我们对任务分配模式的探索到此结束,所以现在,我们将仔细研究请求/回复模式。