服务器多线程:Worker Threads
在 Node.js 中,通过 Node.js 官方推出的 Worker Threads 实现多线程。它的原理和方案与 HTML5 的 Web Worker 相似,但在具体使用上存在区别。
基本使用
在正式介绍 Worker Threads 之前,我们先通过单线程模式执行一个需要大量运算的 JavaScript 任务,该任务的代码如下。
console.time("主线程占用时长");
let runTimes = 30000000000;
let result = 0;
for (let i = 0; i < runTimes; i++) {
result += i;
}
console.timeEnd("主线程占用时长")
console.log("计算结果: "+result);
这段代码会计算 1+2+3+…+300000000 的和。下面在 Node.js 中执行以上代码,运算完成后的输出结果如下。
> 主线程占用时长: 32.617s
> 计算结果: 449999999970159100000
在此期间 Node.js 将无法进行其他处理。
下面将这段代码改写为多线程模式,用 Worker 线程来进行大量运算。
主线程对应 a.ts 文件,其代码如下。
import { Worker } from 'worker_threads';
console.time("主线程占用时长");
let myWorker = new Worker("./b.js");
myWorker.on("message", function (data) {
console.log('计算结果: ' + data);
});
let runTimes = 30000000000;
myWorker.postMessage(runTimes);
console.timeEnd("主线程占用时长");
在 a.ts 文件中,先从 Worker Threads 中引入 Worker 声明,然后新建一个 Worker 对象并引用 b.js 文件(b.ts 文件编译后为 b.js 文件),然后通过 Worker 对象的 on() 方法设置事件函数。on() 方法有两个参数:第一个参数为事件名称(在本例中为 message);第二个参数为事件函数。该函数有一个 data 参数,用来接收来自 b.js 文件的消息,并输出 b.js 文件返回的计算结果。最后调用 Worker 对象的 postMessage() 方法,将运算参数传递给 b.js 文件。
子线程对应 b.ts 文件,其代码如下。
import { parentPort } from 'worker_threads';
parentPort.on("message", function (data) {
console.time("子线程占用时长");
let result = 0;
for (let i = 0; i < data; i++) {
result += i;
}
parentPort.postMessage(result);
console.timeEnd("子线程占用时长");
});
b.ts 文件先从 Worker Threads 中引入 parentPort 对象,用它来和父线程通信。接着通过 parentPort 对象的 on() 方法来设置事件函数。on() 方法有两个参数:第一个参数为事件名称(本例中为 message);第二个参数为事件函数,该函数有一个 data 参数,用来接收来自父线程传来的值。以 data 作为运算参数,计算 1+2+3+…+data 的和。计算完成后,再调用 parentPort 对象的 postMessage() 方法,将运算结果传递给主线程。
输出结果如下。
> 主线程占用时长: 5.131ms
> 计算结果: 449999999970159100000
> 子线程占用时长: 32.140s
当子线程的任务执行完毕后,子线程并未关闭,而处于闲置状态,以等待下一次任务。为了节省资源,你必须及时关闭不再使用的子线程。
在主线程中,使用以下方法关闭子线程。
myWorker.terminate();
在子线程中,使用以下方法关闭子线程自身。
parentPort.close();
错误处理
主线程可以监听子线程是否出错。如果出错,子线程会触发主线程 Worker 对象的 error() 事件函数,并将错误对象作为传入参数。
接下来是错误处理的示例。主线程是 a.ts 文件,其代码如下。代码中通过 Worker 对象的 on() 方法指定了 error() 事件函数,在函数中将输出错误对象是否为 Error 的实例,以及错误对象的内容。
import { Worker } from 'worker_threads';
let myWorker = new Worker("./b.js");
myWorker.on("message", function (data) {
console.log('Result is ' + data);
});
myWorker.on("error", function (error) {
console.log(error instanceof Error);
console.log(error);
});
myWorker.postMessage(null);
子线程是 b.ts 文件,其代码如下。代码中通过 parentPort 对象的 on() 方法指定了 message() 事件函数,在函数中刻意抛出了一个自定义错误。
import { parentPort } from 'worker_threads';
parentPort.on("message", function (data) {
throw new Error("Something wrong!");
});
之后在 Node.js 中运行 a.js 文件,输出结果如下。
> true
> Error: Something wrong!
> at MessagePort.<anonymous>
> at MessagePort.[nodejs.internal.kHybridDispatch]
> at MessagePort.exports.emitMessage
其它事件
除 message 事件和 error 事件之外,Worker Threads 还支持注册其他事件,它们分别如下。
退出事件
退出事件的注册方式如下。
Worker对象.on('exit', (exitCode) => { /*自定义代码*/ });
当 Worker 线程退出时会触发退出事件,触发场景如下。
-
主线程执行了 myWorker.terminate(),exitCode 值为 1。
-
子线程执行了 parentPort.close(),exitCode 值为 0。
-
子线程产生未捕获的异常而中断,exitCode 值为 1。
注册一次性事件
前面介绍了通过 Worker 对象的 on() 方法注册永久性事件,每当事件触发时,对应的事件函数都会执行。
你还可以通过 Worker 对象的 once() 方法注册一次性事件。注册该事件后,只在首次触发事件时执行事件函数,后续触发将不再执行。
一次性事件支持的事件类型和永久性事件支持的一致,均支持 message、error、exit、online、messageerror 事件,二者的区别仅是注册方法不一致。例如,修改17.2.1节中的 a.ts 文件,在其中注册一次性事件,然后多次调用子线程,代码如下。
import { Worker } from 'worker_threads';
console.time("主线程占用时长");
let myWorker = new Worker("./b.js");
myWorker.once("message", function (data) {
console.log("once事件,计算结果:" + data);
});
myWorker.on("message", function (data) {
console.log("on事件,计算结果:" + data);
});
let runTimes = 30000000000;
myWorker.postMessage(runTimes);
myWorker.postMessage(runTimes);
console.timeEnd("主线程占用时长");
在分别注册了一次性 message 事件和永久性 message 事件后,代码中两次使用 postMessage 向子线程传输数据并计算,计算结果也将回传两次。执行上述代码后,输出结果如下,可以看到用 once() 方法注册的事件仅执行了一次,而用 on() 方法注册的事件每次触发时均会执行。
> 主线程占用时长: 5.269ms
> once事件,计算结果:449999999970159100000
> on事件,计算结果:449999999970159100000
> 子线程占用时长: 32.218s
> on事件,计算结果:449999999970159100000
> 子线程占用时长: 33.094s