运行 CPU 密集型任务
我们在异步请求批处理和缓存部分中实现的 totalSales() API(故意)在资源方面非常昂贵,并且运行需要几百毫秒。 尽管如此,调用 totalSales() 函数并不会影响应用程序处理并发传入请求的能力。 我们在第 1 章 Node.js 平台中了解到的事件循环应该可以解释这种行为:调用异步操作总是会导致堆栈回退到事件循环,使其可以自由地处理其他请求。
但是,当我们运行一个需要很长时间才能完成的同步任务并且在完成之前永远不会将控制权交还给事件循环时,会发生什么情况呢? 这种任务也被称为 CPU-bound,因为它的主要特点是重 CPU 利用率而不是重 I/O 操作。
让我们立即通过一个示例来了解这些类型的任务在 Node.js 中的行为方式。
解决子集和问题
现在让我们选择一个计算量大的问题作为我们实验的基础。 一个很好的候选者是子集和问题,它决定整数集(或多集)是否包含总和为零的非空子集。 例如,如果我们将集合 [1, 2, –4, 5, –3] 作为输入,则满足问题的子集为 [1, 2, –3] 和 [2, –4, 5, –3] 。
最简单的算法是检查任意大小的子集的每种可能组合,并且计算成本为 O(2n),或者换句话说,它随着输入的大小呈指数增长。 这意味着一组 20 个整数需要检查多达 1,048,576 个组合,这对于测试我们的假设来说还不错。 对于我们的示例,我们将考虑子集和问题的以下变体:给定一组整数,我们想要计算其总和等于给定任意整数而不仅仅是零的所有可能组合。
现在,让我们努力实现这样的算法。 首先,让我们创建一个名为 subsetSum.js 的新模块。 我们首先创建一个名为 SubsetSum 的类:
export class SubsetSum extends EventEmitter {
constructor (sum, set) {
super()
this.sum = sum
this.set = set
this.totalSubsets = 0
}
//...
javascript
SubsetSum 类扩展了 EventEmitter。 这允许我们每次找到与作为输入接收的总和相匹配的新子集时生成一个事件。 正如我们将看到的,这将为我们带来很大的灵活性。
接下来,让我们看看如何生成所有可能的子集组合:
_combine (set, subset) {
for (let i = 0; i < set.length; i++) {
const newSubset = subset.concat(set[i])
this._combine(set.slice(i + 1), newSubset)
this._processSubset(newSubset)
}
}
javascript
我们不会过多讨论该算法的细节,但有两点需要注意:
-
_combine() 方法是完全同步的。 它递归地生成每个可能的子集,而不会将控制权交还给事件循环。
-
每次生成新组合时,我们都会将其提供给_processSubset() 方法以进行进一步处理。
_processSubset() 方法负责验证给定子集的元素总和是否等于我们要查找的数字:
_processSubset (subset) {
console.log('Subset', ++this.totalSubsets, subset)
const res = subset.reduce((prev, item) => (prev + item), 0)
if (res === this.sum) {
this.emit('match', subset)
}
}
javascript
简而言之,_processSubset() 方法对子集应用归约操作,以计算其元素的总和。 然后,当结果总和等于我们感兴趣的总和 (this.sum) 时,它会发出匹配类型的事件。
最后,start() 方法将前面的所有部分组合在一起:
start () {
this._combine(this.set, [])
this.emit('end')
}
javascript
start() 方法通过调用 _combine() 触发所有组合的生成,最后发出结束事件,表明所有组合都已检查并且任何可能的匹配都已发出。 这是可能的,因为 _combine() 是同步的; 因此,函数一返回就会发出结束事件,这意味着所有组合都已计算完毕。
接下来,我们必须通过网络公开我们刚刚创建的算法。 与往常一样,我们可以使用简单的 HTTP 服务器来完成此任务。 特别是,我们希望创建一个接受格式为 /subsetSum?data=<Array>&sum=<Integer> 的请求的端点,该端点使用给定的整数数组和要匹配的总和来调用 SubsetSum 算法。
让我们在名为 index.js 的模块中实现这个简单的服务器:
import {createServer} from 'http'
import {SubsetSum} from './subsetSum.js'
createServer((req, res) => {
const url = new URL(req.url, 'http://localhost')
if (url.pathname !== '/subsetSum') {
res.writeHead(200)
return res.end('I\'m alive!\n')
}
const data = JSON.parse(url.searchParams.get('data'))
const sum = JSON.parse(url.searchParams.get('sum'))
res.writeHead(200)
const subsetSum = new SubsetSum(sum, data)
subsetSum.on('match', match => {
res.write(`Match: ${JSON.stringify(match)}\n`)
})
subsetSum.on('end', () => res.end())
subsetSum.start()
}).listen(8000, () => console.log('Server started'))
javascript
由于 SubsetSum 对象使用事件返回其结果,因此我们可以在算法生成匹配子集后立即实时流式传输它们。 另一个需要提到的细节是我们的服务器响应文本 “我还活着!” 每次我们点击与 /subsetSum 不同的 URL 时。 我们将使用它来检查服务器的响应能力,稍后我们将看到。
我们现在准备尝试我们的子集和算法。 想知道我们的服务器将如何处理它? 那么让我们启动它:
node index.js
bash
服务器启动后,我们就准备发送第一个请求。 让我们尝试使用 17 个随机数的多重集,这将生成 131,071 个组合,这个数量足以让我们的服务器忙碌一段时间:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[16,19,1,1,-16,9,1,-5,-2,17,-15,-97,19,-16,-4,-5,15]" --data-urlencode "sum=0"
bash
几秒钟后,我们应该看到来自服务器的结果。 但是,如果我们在第一个请求仍在运行时在另一个终端中尝试以下命令,我们会发现一个巨大的问题:
curl -G http://localhost:8000
bash
我们将立即看到最后一个请求挂起,直到第一个请求的子集和算法完成:服务器没有响应! 这其实是预料之中的。 Node.js 事件循环在单个线程中运行,如果该线程被长时间同步计算阻塞,它将无法执行单个周期以响应简单的 “我还活着!”
我们很快就明白,这种行为不适用于任何类型的旨在处理多个并发请求的应用程序。 但不要绝望。 在 Node.js 中,我们可以通过多种方式解决这种情况。 那么,我们来分析一下最流行的三种方法,分别是 “与setImmediate交错”、“使用外部进程” 和 “使用工作线程”。
与 setImmediate 交错
通常,受 CPU 限制的算法是基于一组步骤构建的。 这可以是一组递归调用、循环或它们的任何变体/组合。 因此,解决我们问题的一个简单方法是在每个步骤完成后(或在一定数量的步骤后)将控制权交还给事件循环。 这样,任何待处理的 I/O 仍然可以在长时间运行的算法产生 CPU 的时间间隔内由事件循环处理。 实现此目的的一个简单方法是安排算法的下一步在任何挂起的 I/O 请求之后运行。 这听起来像是 setImmediate() 函数的完美用例(我们已经在第 3 章 回调和事件 中介绍了这个 API)。
交织子集和算法的步骤
现在让我们看看该技术如何应用于我们的子集和算法。 我们所要做的就是稍微修改subsetSum.js 模块。 为了方便起见,我们将创建一个名为subsetSumDefer.js的新模块,以原始 subsetSum 类的代码为起点。
我们要做的第一个改变是添加一个名为 _combineInterleaved() 的新方法,这是我们正在实现的技术的核心:
_combineInterleaved (set, subset) {
this.runningCombine++
setImmediate(() => {
this._combine(set, subset)
if (--this.runningCombine === 0) {
this.emit('end')
}
})
}
javascript
正如我们所看到的,我们所要做的就是推迟使用 setImmediate() 调用原始(同步)_combine() 方法。 然而,现在,知道函数何时完成生成所有组合变得更加困难,因为算法不再同步。
为了解决这个问题,我们必须使用与我们在第 4 章“带有回调的异步控制流模式”中看到的异步并行执行流非常相似的模式来跟踪 _combine() 方法的所有正在运行的实例。 当 _combine() 方法的所有实例都完成运行时,我们可以发出结束事件,通知任何侦听器该过程已完成。
为了完成子集和算法的重构,我们需要进行更多调整。 首先,我们需要将 _combine() 方法中的递归步骤替换为相应的延迟步骤:
_combine (set, subset) {
for (let i = 0; i < set.length; i++) {
const newSubset = subset.concat(set[i])
this._combineInterleaved(set.slice(i + 1), newSubset)
this._processSubset(newSubset)
}
}
javascript
通过前面的更改,我们确保算法的每个步骤都将使用 setImmediate() 在事件循环中排队,因此在任何挂起的 I/O 请求之后执行,而不是同步运行。
另一个小调整是在 start() 方法中:
start () {
this.runningCombine = 0
this._combineInterleaved(this.set, [])
}
javascript
在前面的代码中,我们将 _combine() 方法的运行实例数初始化为 0。我们还将对 _combine() 的调用替换为对 _combineInterleaved() 的调用,并删除了结束事件的发射,因为现在已处理此事件 在 _combineInterleaved() 中异步执行。
通过最后的更改,我们的子集和算法现在应该能够按间隔交错的步骤运行其 CPUbound 代码,其中事件循环可以运行并处理任何其他挂起的请求。
最后缺少的一点是更新 index.js 模块,以便它可以使用新版本的 SubsetSum API。 这实际上是一个微不足道的改变:
import { createServer } from 'http'
// import { SubsetSum } from './subsetSum.js'
import { SubsetSum } from './subsetSumDefer.js'
createServer((req, res) => {
// ...
javascript
我们现在准备尝试这个新版本的子集和服务器。 再次启动服务器,然后尝试发送请求来计算与给定总和匹配的所有子集:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[16,19,1,1,-16,9,1,-5,-2,17,-15,-97,19,-16,-4,-5,15]" --data-urlencode "sum=0"
bash
当请求运行时,再次检查服务器是否响应:
curl -G http://localhost:8000
bash
太酷了 即使 SubsetSum 任务仍在运行,第二个请求也应几乎立即返回,这证明我们的技术运行良好。
关于交错方法的考虑
正如我们所看到的,在保持应用程序响应能力的同时运行 CPU 密集型任务并不那么复杂; 它只需要使用 setImmediate() 来安排算法的下一步在任何挂起的 I/O 之后运行。 然而,就效率而言,这并不是最好的方法。 事实上,推迟任务会带来很小的开销,乘以算法必须运行的所有步骤,可能会对整体运行时间产生重大影响。 在运行 CPU 密集型任务时,这通常是我们最不希望发生的事情。 缓解此问题的一个可能的解决方案是仅在一定数量的步骤之后使用 setImmediate(),而不是在每个步骤中都使用它,但这仍然无法解决问题的根源。
此外,如果每个步骤都需要很长时间才能运行,则此技术效果不佳。 事实上,在这种情况下,事件循环将失去响应能力,并且整个应用程序将开始滞后,这在生产环境中是不希望的。
请记住,这并不意味着我们应该不惜一切代价避免我们刚刚看到的技术。 在某些同步任务偶尔执行且运行时间不长的情况下,使用 setImmediate() 交错执行有时是避免阻塞事件循环的最简单、最有效的方法。
请注意,process.nextTick() 不能用于交错长时间运行的任务。 正如我们在第 3 章回调和事件中看到的,nextTick() 会安排一个任务在任何挂起的 I/O 之前运行,这将在重复调用的情况下导致 I/O 匮乏。 您可以通过将前面示例中的 setImmediate() 替换为 process.nextTick() 来自行验证这一点。 |
使用外部进程
推迟算法的步骤并不是我们运行 CPU 密集型任务的唯一选择。 防止事件循环阻塞的另一种模式是使用子进程。 我们已经知道,Node.js 在运行 Web 服务器等 I/O 密集型应用程序时会发挥最佳性能,这使我们能够凭借其异步架构来优化资源利用率。 因此,我们保持应用程序响应能力的最佳方法是不要在主应用程序上下文中运行昂贵的 CPU 密集型任务,而是使用单独的进程。 这具有三个主要优点:
-
同步任务可以全速运行,无需交错其执行步骤。
-
在 Node.js 中处理进程很简单,可能比修改算法以使用 setImmediate() 更容易,并且允许我们轻松使用多个处理器,而无需扩展主应用程序本身。
-
如果我们确实需要最大性能,则可以使用较低级语言创建外部进程,例如良好的旧 C 或更现代的编译语言(如 Go 或 Rust)。 始终使用最好的工具来完成工作!
Node.js 拥有丰富的 API 工具带,用于与外部进程交互。 我们可以在 child_process 模块中找到我们需要的一切。 此外,当外部进程只是另一个 Node.js 程序时,将其连接到主应用程序非常容易,并且允许与本地应用程序无缝通信。 这种神奇的发生要归功于 child_process.fork() 函数,它创建一个新的 Node.js 子进程,并自动创建与其通信的通道,允许我们使用与 EventEmitter 非常相似的接口交换信息。 让我们通过再次重构我们的子集和服务器来看看它是如何工作的。
将子集求和任务委托给外部进程
重构 SubsetSum 任务的目标是创建一个单独的子进程,负责处理同步处理,使主服务器的事件循环能够自由地处理来自网络的请求。 为了实现这一目标,我们将遵循以下秘诀:
-
我们将创建一个名为 processPool.js 的新模块,它允许我们创建正在运行的进程池。 启动新进程成本高昂且需要时间,因此保持它们不断运行并准备好处理请求可以让我们节省时间和 CPU 周期。 此外,该池还将帮助我们限制同时运行的进程数量,以防止应用程序遭受拒绝服务 (DoS) 攻击。
-
接下来,我们将创建一个名为 subsetSumFork.js 的模块,负责抽象在子进程中运行的SubsetSum任务。 它的作用将是与子进程通信并转发任务结果,就好像它们来自当前应用程序一样。
-
最后,我们需要一个工作进程(我们的子进程),一个新的 Node.js 程序,其唯一目标是运行子集和算法并将其结果转发给父进程。
DoS 攻击的目的是使其用户无法使用计算机。 这通常是通过利用漏洞耗尽此类机器的容量或通过请求使其大量过载(DDoS - 分布式 DoS)来实现的。 |
实现进程池
让我们从逐步构建 processPool.js 模块开始:
import { fork } from 'child_process'
export class ProcessPool {
constructor (file, poolMax) {
this.file = file
this.poolMax = poolMax
this.pool = []
this.active = []
this.waiting = []
}
//...
javascript
在模块的第一部分中,我们从 child_process 模块导入 fork() 函数,我们将用它来创建新进程。 然后,我们定义 ProcessPool 构造函数,它接受一个表示要运行的 Node.js 程序的文件参数,以及池中运行实例的最大数量 (poolMax)。 然后,我们定义三个实例变量:
-
pool 是可供使用的正在运行的进程的集合。
-
active 包含当前正在使用的进程的列表。
-
waiting 包含一个回调队列,用于处理由于缺乏可用进程而无法立即满足的所有请求。
ProcessPool 类的下一部分是 acquire() 方法,该方法负责在可用时最终返回一个可供使用的进程:
acquire()
{
return new Promise((resolve, reject) => {
let worker
if (this.pool.length > 0) { // (1)
worker = this.pool.pop()
this.active.push(worker)
return resolve(worker)
}
if (this.active.length >= this.poolMax) { // (2)
return this.waiting.push({resolve, reject})
}
worker = fork(this.file) // (3)
worker.once('message', message => {
if (message === 'ready') {
this.active.push(worker)
return resolve(worker)
}
worker.kill()
reject(new Error('Improper process start'))
})
worker.once('exit', code => {
console.log(`Worker exited with code ${code}`)
this.active = this.active.filter(w => worker !== w)
this.pool = this.pool.filter(w => worker !== w)
})
})
}
javascript
acquire() 的逻辑非常简单,解释如下:
-
如果池中有一个进程可供使用,我们只需将其移动到活动列表中,然后使用它来通过resolve( )。
-
如果池中没有可用进程,并且我们已经达到了运行进程的最大数量,则必须等待一个可用进程。 我们通过对外部 promise 的 resolve() 和 reject() 回调进行排队以供以后使用来实现这一点。
-
如果尚未达到运行进程的最大数量,我们将使用 child_process.fork() 创建一个新进程。 然后,我们等待来自新启动的进程的就绪消息,这表明该进程已启动并准备好接受新作业。 所有以 child_process.fork() 启动的进程都会自动提供这个基于消息的通道。
ProcessPool 类的最后一个方法是release(),其目的是在处理完进程后将其放回池中:
release (worker) {
if (this.waiting.length > 0) { // (1)
const { resolve } = this.waiting.shift()
return resolve(worker)
}
this.active = this.active.filter(w => worker !== w) // (2)
this.pool.push(worker)
}
javascript
release() 方法的工作原理如下:
-
如果等待列表中有请求,我们只需将要释放的工作线程传递给等待队列头部的resolve() 回调即可重新分配该工作进程。
-
否则,我们从活动列表中删除要释放的工作线程并将其放回池中。
正如我们所看到的,进程永远不会停止,而只是重新分配,这样我们就不必在每次请求时重新启动它们,从而节省了时间。 但是,重要的是要注意,这可能并不总是最佳选择,这在很大程度上取决于您的应用程序的要求。
用于减少长期内存使用并增加进程池弹性的其他可能的调整包括:
-
在闲置一段时间后终止空闲进程以释放内存。
-
添加一种机制来终止无响应的进程或重新启动已崩溃的进程。
在这个例子中,我们将保持进程池的实现简单,因为我们可以添加的细节确实是无穷无尽的。
与子进程通信
现在我们的 ProcessPool 类已经准备好了,我们可以使用它来实现 SubsetSumFork 类,该类的作用是与工作线程通信并转发它产生的结果。 正如我们已经提到的,使用 child_process.fork() 启动进程还为我们提供了一个简单的基于消息的通信通道,所以让我们通过实现 subsetSumFork.js 模块来看看它是如何工作的:
import {EventEmitter} from 'events'
import {dirname, join} from 'path'
import {fileURLToPath} from 'url'
import {ProcessPool} from './processPool.js'
const __dirname = dirname(fileURLToPath(import.meta.url))
const workerFile = join(__dirname,
'workers', 'subsetSumProcessWorker.js')
const workers = new ProcessPool(workerFile, 2)
export class SubsetSum extends EventEmitter {
constructor(sum, set) {
super()
this.sum = sum
this.set = set
}
async start() {
const worker = await workers.acquire() // (1)
worker.send({sum: this.sum, set: this.set})
const onMessage = msg => {
if (msg.event === 'end') { // (3)
worker.removeListener('message', onMessage)
workers.release(worker)
}
this.emit(msg.event, msg.data) // (4)
}
worker.on('message', onMessage) // (2)
}
}
javascript
首先要注意的是,我们使用文件 ./workers/subsetSumProcessWorker.js 作为子进程创建了一个新的 ProcessPool 对象。 我们还将池的最大容量设置为 2。
另一点值得一提的是,我们试图维护与原始 SubsetSum 类相同的公共 API。 事实上,SubsetSumFork 是一个 EventEmitter,其构造函数接受 sum 和 set,而 start() 方法触发算法的执行,这一次,算法在单独的进程上运行。 这是调用 start() 方法时发生的情况:
-
我们尝试从池中获取一个新的子进程。 当操作完成时,我们立即使用工作进程句柄向子进程发送一条消息,其中包含要运行的作业的数据。 send() API 由 Node.js 自动提供给所有以 child_process.fork() 启动的进程。 这本质上就是我们正在讨论的沟通渠道。
-
然后,我们开始使用 on() 方法监听工作进程发送的任何消息,以附加一个新的监听器(这也是由 child_process.fork() 启动的所有进程提供的通信通道的一部分)。
-
在 onMessage 监听器中,我们首先检查是否收到结束事件,这意味着 SubsetSum 任务已完成,在这种情况下,我们删除 onMessage 监听器并释放工作线程,将其放回池中。
-
工作进程以 {event, data} 格式生成消息,使我们能够无缝转发(重新发出)子进程生成的任何事件。
这就是 SubsetSumFork 包装器。 现在让我们实现工作线程(我们的子进程)。
很高兴知道子进程实例上可用的 send() 方法也可用于将套接字句柄从主应用程序传播到子进程(请参阅 nodejsdp.link/childprocess-send 中的文档)。 这实际上是集群模块使用的技术,用于在多个进程之间分配 HTTP 服务器的负载。 我们将在下一章中更详细地看到这一点。 |
实现 worker
现在让我们创建 workers/subsetSumProcessWorker.js 模块,我们的工作进程:
import {SubsetSum} from '../subsetSum.js'
process.on('message', msg => { // (1)
const subsetSum = new SubsetSum(msg.sum, msg.set)
subsetSum.on('match', data => { // (2)
process.send({event: 'match', data: data})
})
subsetSum.on('end', data => {
process.send({event: 'end', data: data})
})
subsetSum.start()
})
process.send('ready')
javascript
我们可以立即看到我们正在按原样重用原始(同步)SubsetSum。 现在我们处于一个单独的进程中,我们不必再担心阻塞事件循环; 所有 HTTP 请求将继续由主应用程序的事件循环处理,不会中断。
当工作进程作为子进程启动时,会发生以下情况:
-
它立即开始监听来自父进程的消息。 这可以通过 process.on() 函数(使用 child_process.fork() 启动进程时提供的通信 API 的一部分)轻松完成。
我们期望从父进程获得的唯一消息是向新的 SubsetSum 任务提供输入的消息。 一旦收到这样的消息,我们就会创建 SubsetSum 类的新实例,并注册匹配和结束事件的侦听器。 最后,我们使用subsetSum.start()开始计算。
-
每次从正在运行的算法接收到事件时,我们将其包装在格式为 {event, data} 的对象中并将其发送到父进程。然后,如我们在上一节中所见,这些消息在 subsetSumFork.js 模块中进行处理。
正如我们所看到的,我们只需包装我们已经构建的算法,而无需修改其内部结构。 这清楚地表明,只需使用我们刚刚看到的技术,就可以轻松地将应用程序的任何部分放入外部进程中。
当子进程不是 Node.js 程序时,我们刚才描述的简单通信通道(on()、send())不可用。 在这些情况下,我们仍然可以通过在暴露给父进程的标准输入和标准输出流之上实现我们自己的协议来与子进程建立接口。 要了解有关 child_process API 的所有功能的更多信息,您可以参考 Node.js 官方文档:nodejsdp.link/child_process。 |
多进程方法的注意事项
与往常一样,要尝试这个新版本的子集和算法,我们只需替换 HTTP 服务器使用的模块(index.js 文件):
import { createServer } from 'http'
// import { SubsetSum } from './subsetSum.js'
// import { SubsetSum } from './subsetSumDefer.js'
import { SubsetSum } from './subsetSumFork.js'
createServer((req, res) => {
//...
javascript
我们现在可以再次启动服务器并尝试发送示例请求:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[16,19,1,1,-16,9,1,-5,-2,17,-15,-97,19,-16,-4,-5,15]" --data-urlencode "sum=0"
bash
至于我们之前看到的交错方法,使用新版本的 SubsetSum 模块,在运行 CPU 密集型任务时事件循环不会被阻塞。 这可以通过发送另一个并发请求来确认,如下所示:
curl -G http://localhost:8000
bash
前面的命令应该立即返回文本 “我还活着!”
更有趣的是,我们还可以尝试同时启动两个 SubsetSum 任务,我们将能够看到它们将使用两个不同处理器的全部功能来运行(当然,如果您的系统有多个处理器)。 相反,如果我们尝试同时运行三个 SubsetSum 任务,结果应该是最后一个启动的任务将挂起。 这并不是因为主进程的事件循环被阻塞,而是因为我们为 SubsetSum 任务设置了两个进程的并发限制,这意味着只要两个进程中至少有一个进程完成,第三个请求就会被处理。 池再次可用。
正如我们所看到的,与交错方法相比,多进程方法具有许多优点。 首先,它在运行算法时不会引入任何计算损失。 其次,它可以充分利用多处理器机器。
现在,让我们看看使用线程而不是进程的替代方法。
使用工作线程
从 Node 10.5.0 开始,我们有了一种新机制,可以在主事件循环之外运行 CPU 密集型算法,称为工作线程。 工作线程可以被视为 child_process.fork() 的轻量级替代品,并具有一些额外的好处。 与进程相比,工作线程具有更小的内存占用和更快的启动时间,因为它们在主进程内运行但在不同的线程内。
尽管工作线程基于真实线程,但它们不允许其他语言(例如 Java 或 Python)支持的深度同步和共享功能。 这是因为 JavaScript 是一种单线程语言,它没有任何内置机制来同步多个线程对变量的访问。 具有线程的 JavaScript 根本就不是 JavaScript。 在 Node.js 中发挥线程的所有优点而无需实际更改语言的解决方案是工作线程。
工作线程本质上是默认情况下不与主应用程序线程共享任何内容的线程; 它们在自己的 V8 实例中运行,具有独立的 Node.js 运行时和事件循环。 由于基于消息的通信通道、ArrayBuffer 对象的传输以及使用其同步由用户管理的 SharedArrayBuffer 对象(通常在 Atomics 的帮助下),与主线程的通信成为可能。
您可以在以下文章中阅读有关 SharedArrayBuffer 和 Atomics 的更多信息:nodejsdp.link/shared-array-buffer。 尽管本文重点讨论 Web Workers,但许多概念与 Node.js 的工作线程类似。 |
工作线程与主线程的这种广泛隔离保留了语言的完整性。 同时,基本的通信设施和数据共享能力足以满足 99% 的用例。
现在,让我们在 SubsetSum 示例中使用工作线程。
在工作线程中运行子集求和任务
工作线程 API 与 ChildProcess 有很多共同点,因此对代码的更改很小。
首先,我们需要创建一个名为 ThreadPool 的新类,它是我们的 ProcessPool,适合与工作线程而不是进程一起操作。 以下代码显示了新的 ThreadPool 类和 ProcessPool 类之间的差异。 acquire() 方法只有一些差异,已突出显示; 其余代码是相同的:
import {Worker} from 'worker_threads'
export class ThreadPool {
// ...
acquire() {
return new Promise((resolve, reject) => {
let worker
if (this.pool.length > 0) {
worker = this.pool.pop()
this.active.push(worker)
return resolve(worker)
}
if (this.active.length >= this.poolMax) {
return this.waiting.push({resolve, reject})
}
worker = new Worker(this.file)
worker.once('online', () => {
this.active.push(worker)
resolve(worker)
})
worker.once('exit', code => {
console.log(`Worker exited with code ${code}`)
this.active = this.active.filter(w => worker !== w)
this.pool = this.pool.filter(w => worker !== w)
})
})
}
//...
}
javascript
接下来,我们需要调整工作线程并将其放置在名为 subsetSumThreadWorker.js 的新文件中。 与我们的旧工作人员的主要区别在于,我们必须使用 parentPort.postMessage() 和 parentPort.on(),而不是使用 process.send() 和 process.on():
import {parentPort} from 'worker_threads'
import {SubsetSum} from '../subsetSum.js'
parentPort.on('message', msg => {
const subsetSum = new SubsetSum(msg.sum, msg.set)
subsetSum.on('match', data => {
parentPort.postMessage({event: 'match', data: data})
})
subsetSum.on('end', data => {
parentPort.postMessage({event: 'end', data: data})
})
subsetSum.start()
})
javascript
同样,模块 subsetSumThreads.js 本质上与 subsetSumFork.js 模块相同,除了几行代码之外,这些代码在以下代码片段中突出显示:
import {EventEmitter} from 'events'
import {dirname, join} from 'path'
import {fileURLToPath} from 'url'
import {ThreadPool} from './threadPool.js'
const __dirname = dirname(fileURLToPath(import.meta.url))
const workerFile = join(__dirname,
'workers', 'subsetSumThreadWorker.js')
const workers = new ThreadPool(workerFile, 2)
export class SubsetSum extends EventEmitter {
constructor(sum, set) {
super()
this.sum = sum
this.set = set
}
async start() {
const worker = await workers.acquire()
worker.postMessage({sum: this.sum, set: this.set})
const onMessage = msg => {
if (msg.event === 'end') {
worker.removeListener('message', onMessage)
workers.release(worker)
}
this.emit(msg.event, msg.data)
}
worker.on('message', onMessage)
}
}
javascript
正如我们所看到的,调整现有应用程序以使用工作线程而不是分叉进程是一项微不足道的操作。 这是因为这两个组件的 API 非常相似,而且还因为工作线程与成熟的 Node.js 进程有很多共同点。
最后,我们需要更新 index.js 模块,以便它可以使用新的 subsetSumThreads.js 模块,正如我们在算法的其他实现中看到的那样:
import { createServer } from 'http'
// import { SubsetSum } from './subsetSum.js'
// import { SubsetSum } from './subsetSumDefer.js'
// import { SubsetSum } from './subsetSumFork.js'
import { SubsetSum } from './subsetSumThreads.js'
createServer((req, res) => {
// ...
javascript
现在,您可以尝试使用工作线程的新版本的子集和服务器。 对于前两个实现,主应用程序的事件循环不会被子集和算法阻塞,因为它在单独的线程中运行。
我们看到的示例仅使用了工作线程提供的所有功能的一小部分。 对于更高级的主题,例如传输 ArrayBuffer 对象或 SharedArrayBuffer 对象,您可以阅读官方 API 文档:nodejsdp.link/worker-threads。 |
在生产环境中运行 CPU 密集型任务
到目前为止我们看到的示例应该可以让您了解我们可以使用哪些工具在 Node.js 中运行 CPU 密集型操作。 然而,进程池和线程池等组件是复杂的机器,需要适当的机制来处理超时、错误和其他类型的故障,为简洁起见,我们在实现中省略了这些机制。 因此,除非您有特殊要求,否则最好依赖更多经过实战检验的库进行生产使用。 其中两个库是workerpool (nodejsdp.link/workerpool) 和piscina (nodejsdp.link/piscina),它们基于我们在本节中看到的相同概念。 它们允许我们使用外部进程或工作线程来协调 CPU 密集型任务的执行。
最后一个观察是,我们必须考虑到,如果我们要运行特别复杂的算法,或者 CPU 密集型任务的数量超过单个节点的容量,我们可能不得不考虑跨多个节点扩展计算。 这是一个完全不同的问题,我们将在接下来的两章中详细了解这一点。