使用流的异步控制流模式

通过我们到目前为止所提供的示例,应该清楚流不仅可用于处理 I/O,而且还可作为一种优雅的编程模式,可用于处理任何类型的数据。 但其优点并不仅仅在于其简单的外观。 正如我们将在本节中看到的,还可以利用流将 “异步控制流” 转变为 “流控制”。

顺序执行

默认情况下,流将按顺序处理数据。 例如,Transform 流的 _transform() 函数永远不会使用下一个数据块来调用,直到通过调用 callback() 完成前一个调用。 这是流的一个重要属性,对于以正确的顺序处理每个块至关重要,但它也可以用于将流变成传统控制流模式的优雅替代方案。

一些代码总是比太多解释更好,所以让我们通过一个示例来演示如何使用流按顺序执行异步任务。 让我们创建一个函数,用于连接作为输入接收的一组文件,并确保遵循它们提供的顺序。 让我们创建一个名为 concat-files.js 的新模块并定义其内容如下:

import { createWriteStream, createReadStream } from 'fs'
import { Readable, Transform } from 'stream'

export function concatFiles (dest, files) {
    return new Promise((resolve, reject) => {
        const destStream = createWriteStream(dest)
        Readable.from(files) // (1)
            .pipe(new Transform({ // (2)
                objectMode: true,
                transform (filename, enc, done) {
                    const src = createReadStream(filename)
                    src.pipe(destStream, { end: false })
                    src.on('error', done)
                    src.on('end', done) // (3)
                }
            }))
            .on('error', reject)
            .on('finish', () => { // (4)
                destStream.end()
                resolve()
            })
    })
}

前面的函数通过将文件数组转换为流来实现对文件数组的顺序迭代。 该算法可以解释如下:

  1. 首先,我们使用 Readable.from() 从 files 数组创建一个 Readable 流。 该流以对象模式运行(使用 Readable.from() 创建的流的默认设置),并且它将发出文件名:每个块都是一个指示文件路径的字符串。 块的顺序遵循文件数组中文件的顺序。

  2. 接下来,我们创建一个自定义转换流来处理序列中的每个文件。 由于我们正在接收字符串,因此我们将选项 objectMode 设置为 true。 在我们的转换逻辑中,对于每个文件,我们创建一个可读流来读取文件内容并将其通过管道传输到 destStream(目标文件的可写流)。 我们通过在 pipeline() 选项中指定 { end: false } 来确保在源文件完成读取后不关闭 destStream。

  3. 当源文件的所有内容都已通过管道传输到 destStream 时,我们调用 did 函数来传达当前处理的完成情况,这是触发下一个文件的处理所必需的。

  4. 当所有文件都处理完毕后,将触发 finish 事件; 我们最终可以结束 destStream 并调用 concatFiles() 的 cb() 函数,这标志着整个操作的完成。

我们现在可以尝试使用我们刚刚创建的小模块。 让我们在一个名为 concat.js 的新文件中执行此操作:

import { concatFiles } from './concat-files.js'

async function main () {
    try {
        await concatFiles(process.argv[2], process.argv.slice(3))
    } catch (err) {
        console.error(err)
        process.exit(1)
    }

    console.log('All files concatenated successfully')
}
main()

现在,我们可以通过将目标文件作为第一个命令行参数传递,然后传递要连接的文件列表来运行前面的程序; 例如:

node concat.js all-together.txt file1.txt file2.txt

这应该创建一个名为 all-together.txt 的新文件,其中按顺序包含 file1.txt 和 file2.txt 的内容。

通过 concatFiles() 函数,我们能够仅使用流来获得异步顺序迭代。 这是一个优雅而紧凑的解决方案,它丰富了我们的工具带,以及我们在第 4 章 “带有回调的异步控制流模式” 和第 5 章 “带有 Promises 和 Async/Await 的异步控制流模式” 中探索的技术。

图案

使用流或流组合可以轻松地按顺序迭代一组异步任务。

在下一节中,我们将了解如何使用 Node.js 流来实现无序并行任务执行。

无序并行执行

我们刚刚看到流按顺序处理每个数据块,但有时,这可能会成为瓶颈,因为我们无法充分利用 Node.js 的并发性。 如果我们必须对每个数据块执行缓慢的异步操作,那么并行执行并加快整个过程可能会很有利。 当然,这种模式只能在每个数据块之间没有关系的情况下应用,这种情况对于对象流可能经常发生,但对于二进制流则很少发生。

当数据处理顺序很重要时,不能使用无序并行流。

为了并行执行转换流,我们可以应用在第 4 章 “带回调的异步控制流模式” 中学到的相同模式,但进行一些调整以使它们能够与流一起使用。 让我们看看这是如何工作的。

实现无序并行流

让我们立即通过示例演示如何实现无序并行流。 让我们创建一个名为 parallel-stream.js 的模块,并定义一个并行执行给定转换函数的通用 Transform 流:

import { Transform } from 'stream'

export class ParallelStream extends Transform {
    constructor (userTransform, opts) { // (1)
        super({ objectMode: true, ...opts })
        this.userTransform = userTransform
        this.running = 0
        this.terminateCb = null
    }

    _transform (chunk, enc, done) { // (2)
        this.running++
        this.userTransform(
            chunk,
            enc,
            this.push.bind(this),
            this._onComplete.bind(this)
        )
        done()
    }

    _flush (done) { // (3)
        if (this.running > 0) {
            this.terminateCb = done
        } else {
            done()
        }
    }

    _onComplete (err) { // (4)
        this.running--
        if (err) {
            return this.emit('error', err)
        }
        if (this.running === 0) {
            this.terminateCb && this.terminateCb()
        }
    }
}

让我们一步步分析这个新类:

  1. 如您所见,构造函数接受一个 userTransform() 函数,然后将该函数保存为实例变量。 我们调用父构造函数,为了方便起见,我们默认启用对象模式。

  2. 接下来,轮到 _transform() 方法了。 在此方法中,我们执行 userTransform() 函数,然后增加正在运行的任务的计数。 最后,我们通过调用 did() 通知 Transform 流当前转换步骤已完成。 并行触发另一个项目的处理的技巧正是如此。 在调用done()之前,我们不会等待userTransform()函数完成; 相反,我们立即这样做。 另一方面,我们为 userTransform() 提供了一个特殊的回调,即 this._onComplete() 方法。 这使我们能够在 userTransform() 执行完成时收到通知。

  3. _flush() 方法在流终止之前被调用,因此如果仍有任务在运行,我们可以通过不立即调用 did() 回调来暂停 finish 事件的释放。 相反,我们将其分配给 this.terminateCallback 变量。

  4. 要了解流如何正确终止,我们必须研究 _onComplete() 方法。 每次异步任务完成时都会调用最后一个方法。 它检查是否有更多任务正在运行,如果没有,则调用 this.terminateCallback() 函数,这将导致流结束,释放在 _flush() 方法中搁置的完成事件。

我们刚刚构建的 ParallelStream 类允许我们轻松创建一个并行执行其任务的 Transform 流,但有一个警告:它不会保留接收项目的顺序。 事实上,异步操作可以随时完成并推送数据,而不管它们何时开始。 我们立即了解到,此属性在数据顺序通常很重要的二进制流中不能很好地发挥作用,但它对于某些类型的对象流肯定很有用。

实现 URL 状态监控应用程序

现在,让我们将 ParallelStream 应用到一个具体示例中。 假设我们想要构建一个简单的服务来监视大量 URL 列表的状态。 假设所有这些 URL 都包含在一个文件中,并且以换行符分隔。

Streams 可以为这个问题提供一个非常高效和优雅的解决方案,特别是如果我们使用 ParallelStream 类来并行检查 URL。

让我们立即在名为 check-urls.js 的新模块中构建这个简单的应用程序:

import { pipeline } from 'stream'
import { createReadStream, createWriteStream } from 'fs'
import split from 'split'
import superagent from 'superagent'
import { ParallelStream } from './parallel-stream.js'

pipeline(
    createReadStream(process.argv[2]), // (1)
    split(), // (2)
    new ParallelStream( // (3)
        async (url, enc, push, done) => {
            if (!url) {
                return done()
            }
            try {
                await superagent.head(url, { timeout: 5 * 1000 })
                push(`${url} is up\n`)
            } catch (err) {
                push(`${url} is down\n`)
            }
            done()
        }
    ),
    createWriteStream('results.txt'), // (4)
    (err) => {
        if (err) {
            console.error(err)
            process.exit(1)
        }
        console.log('All urls have been checked')
    }
)

正如我们所看到的,使用流,我们的代码看起来非常优雅和简单:所有内容都包含在单个流管道中。 让我们看看它是如何工作的:

  1. 首先,我们从作为输入给出的文件创建一个可读流。

  2. 我们通过 split (nodejsdp.link/split) 传输输入文件的内容,这是一个 Transform 流,可确保每一行都在不同的块中发出。

  3. 然后,是时候使用我们的ParallelStream来检查URL了。 我们通过发送头请求并等待响应来做到这一点。 当操作完成时,我们将结果推送到流中。

  4. 最后,所有结果都通过管道传输到文件 results.txt 中。

现在,我们可以使用如下命令运行 check-urls.js 模块:

node check-urls.js urls.txt

这里,文件 urls.txt 包含 URL 列表(每行一个); 例如:

https://mario.fyi
https://loige.co
http://thiswillbedownforsure.com

当命令完成运行时,我们将看到创建了一个文件 results.txt。 这包含操作的结果; 例如:

http://thiswillbedownforsure.com is down
https://mario.fyi is up
https://loige.co is up

结果写入的顺序很可能与输入文件中指定 URL 的顺序不同。 这是明显的证据,表明我们的流并行执行其任务,并且它不会强制流中各个数据块之间的任何顺序。

出于好奇,我们可能想尝试用普通的 Transform 流替换 ParallelStream 并比较两者的行为和性能(您可能希望将其作为练习)。 直接使用 Transform 会慢很多,因为每个 URL 都会按顺序检查,但另一方面,文件 results.txt 中的结果顺序会被保留。

在下一节中,我们将了解如何扩展此模式以限制给定时间运行的并行任务的数量。

无序有限并行执行

如果我们尝试对包含数千或数百万个 URL 的文件运行 check-urls.js 应用程序,我们肯定会遇到问题。 我们的应用程序将同时创建数量不受控制的连接,并行发送大量数据,并可能破坏应用程序的稳定性和整个系统的可用性。 众所周知,控制负载和资源使用的解决方案是限制并行任务的并发数。

让我们通过创建一个 limit-parallel-stream.js 模块来看看它如何与流一起工作,该模块是我们在上一节中创建的 parallel-stream.js 的改编版。

让我们从它的构造函数开始看看它是什么样子的(我们将突出显示更改的部分):

export class LimitedParallelStream extends Transform {
    constructor (concurrency, userTransform, opts) {
        super({ ...opts, objectMode: true })
        this.concurrency = concurrency
        this.userTransform = userTransform
        this.running = 0
        this.continueCb = null
        this.terminateCb = null
    }
// ...

我们需要将并发限制作为输入,这一次,我们将保存两个回调,一个用于任何待处理的 _transform 方法(继续Cb - 接下来将详细介绍),另一个用于 _flush 方法的回调(terminateCb) 。

接下来是 _transform() 方法:

_transform (chunk, enc, done) {
    this.running++
    this.userTransform(
        chunk,
        enc,
        this.push.bind(this),
        this._onComplete.bind(this)
    )
    if (this.running < this.concurrency) {
        done()
    } else {
        this.continueCb = done
    }
}

这次,在 _transform() 方法中,我们必须检查是否有任何空闲执行槽,然后才能调用 did() 并触发下一项的处理。 如果我们已经达到了并发运行流的最大数量,我们可以简单地将 done() 回调保存在 continueCb 变量中,以便在任务完成后立即调用它。

_flush() 方法与 ParallelStream 类中的完全相同,因此让我们直接实现 _onComplete() 方法:

_onComplete (err) {
    this.running--
    if (err) {
        return this.emit('error', err)
    }
    const tmpCb = this.continueCb
    this.continueCb = null
    tmpCb && tmpCb()
    if (this.running === 0) {
        this.terminateCb && this.terminateCb()
    }
}

每次任务完成时,我们都会调用任何已保存的 continueCb(),这将导致流解除阻塞,从而触发下一个项目的处理。

LimitedParallelStream 类就是这样。 现在,我们可以在 check-urls.js 模块中使用它来代替 ParallelStream,并将任务的并发度限制为我们设置的值。

有序并行执行

我们之前创建的并行流可能会打乱发出数据的顺序,但在某些情况下这是不可接受的。 事实上,有时候,有必要按照接收到的顺序发出每个块。 然而,并非所有希望都破灭了:我们仍然可以并行运行变换函数; 我们所要做的就是对每个任务发出的数据进行排序,使其遵循接收数据的相同顺序。

该技术涉及使用缓冲区在每个正在运行的任务发出块时对块进行重新排序。 为了简洁起见,我们不打算提供此类流的实现,因为它对于本书的范围来说相当冗长。 我们要做的是重用 npm 上为此特定目的构建的可用包之一,即并行转换 (nodejsdp.link/paralleltransform)。

我们可以通过修改现有的 check-urls 模块来快速检查有序并行执行的行为。 假设我们希望结果以与输入文件中的 URL 相同的顺序写入,同时并行执行检查。 我们可以使用并行变换来做到这一点:

//...
import parallelTransform from 'parallel-transform'

pipeline(
    createReadStream(process.argv[2]),
    split(),
    parallelTransform(4, async function (url, done) {
        if (!url) {
            return done()
        }
        console.log(url)
        try {
            await request.head(url, { timeout: 5 * 1000 })
            this.push(`${url} is up\n`)
        } catch (err) {
            this.push(`${url} is down\n`)
        }
        done()
    }),
    createWriteStream('results.txt'),
    (err) => {
        if (err) {
            console.error(err)
            process.exit(1)
        }
        console.log('All urls have been checked')
    }
)

在此示例中,parallelTransform() 在对象模式下创建一个 Transform 流,以最大并发数 4 执行转换逻辑。如果我们尝试运行这个新版本的 check-urls.js,我们现在将看到结果。 txt 文件按照 URL 在输入文件中出现的顺序列出结果。 值得注意的是,即使输出的顺序与输入相同,异步任务仍然并行运行,并且可以按任何顺序完成。

当使用有序并行执行模式时,我们需要注意缓慢的项目会阻塞管道或无限期地增加内存。 事实上,如果有一个项目需要很长时间才能完成,根据模式的实现,它要么导致包含挂起的有序结果的缓冲区无限增长,要么整个处理阻塞,直到慢速项目完成 。 使用第一个策略,我们正在优化性能,而使用第二个策略,我们可以获得可预测的内存使用情况。 并行转换实现选择可预测的内存利用率,并维护一个内部缓冲区,该缓冲区的增长不会超过指定的最大并发数。

至此,我们结束了对流的异步控制流模式的分析。 接下来,我们将重点关注一些管道图案。