管道模式

与现实生活中的管道一样,Node.js 流也可以通过遵循不同的模式通过管道连接在一起。 事实上,我们可以将两个不同流的流合并为一个,将一个流的流拆分为两个或多个管道,或者根据条件重定向流。 在本节中,我们将探讨可应用于 Node.js 流的最重要的管道模式。

合并流

在本章中,我们强调了这样一个事实:流提供了一个简单的基础设施来模块化和重用我们的代码,但是这个难题中缺少最后一块:如果我们想要模块化和重用整个管道怎么办? 如果我们想要合并多个流,使它们从外部看起来就像一个流,该怎么办? 下图显示了这意味着什么:

image 2024 05 07 10 20 23 123
Figure 1. 图 6.6:组合流

从图 6.6 中,我们应该已经了解到它是如何工作的:

  • 当我们写入组合流时,我们实际上正在写入管道的第一个流。

  • 当我们从组合流中读取时,我们实际上是从管道的最后一个流中读取。

组合流通常是双工流,它是通过将第一个流连接到其可写端并将最后一个流连接到其可读端来构建的。

要从两个不同的流(一个可写和一个可读)创建双工流,我们可以使用 npm 模块,例如 duplexer2 (nodejsdp.link/duplexer2) 或 duplexify (nodejsdp.link/duplexify)。

但这还不够。 事实上,组合流的另一个重要特征是它必须捕获并传播从管道内的任何流发出的所有错误。 正如我们已经提到的,当我们使用 pipeline() 时,任何错误事件都不会自动沿着管道传播,我们应该显式地将错误侦听器附加到每个流。 我们看到我们可以使用 pipeline() 辅助函数来克服 pipeline() 的错误管理限制,但是 pipeline() 和 pipeline() 辅助函数的问题是这两个函数仅返回管道的最后一个流。 管道,因此我们只获得(最后一个)可读组件,而不是(第一个)可写组件。

我们可以使用以下代码片段非常轻松地验证这一点:

import { createReadStream, createWriteStream } from 'fs'
import { Transform, pipeline } from 'stream'
import { strict as assert } from 'assert'
const streamA = createReadStream('package.json')
const streamB = new Transform({
    transform (chunk, enc, done) {
        this.push(chunk.toString().toUpperCase())
        done()
    }
})
const streamC = createWriteStream('package-uppercase.json')

const pipelineReturn = pipeline(
    streamA,
    streamB,
    streamC,
    () => {
        // handle errors here
    })
assert.strictEqual(streamC, pipelineReturn) // valid

const pipeReturn = streamA.pipe(streamB).pipe(streamC)
assert.strictEqual(streamC, pipeReturn) // valid

从前面的代码中可以清楚地看出,仅使用 pipeline() 或 pipeline() 来构造组合流并不是一件容易的事。

回顾一下,组合流有两个主要优点:

  • 我们可以通过隐藏其内部管道将其作为黑盒重新分发。

  • 我们简化了错误管理,因为我们不必将错误侦听器附加到管道中的每个流,而只需附加到组合流本身。

组合流是一种非常常见的做法,因此如果我们没有任何特殊需求,我们可能只想重用现有的库,例如 Pumpify (nodejsdp.link/pumpify)。

该库提供了一个非常简单的界面。 事实上,要获得组合流,您所要做的就是调用 Pumpify(),传递管道中所需的所有流。 这与 pipeline() 的签名非常相似,只是没有回调:

const combinedStream = pumpify(streamA, streamB, streamC)

当我们做这样的事情时,pumpify 将从我们的流中创建一个管道,返回一个新的组合流,抽象出管道的复杂性,并提供前面讨论的优点。

如果您想知道如何构建像 Pumpify 这样的库,您应该在 GitHub 上查看其源代码 (nodejsdp.link/pumpify-gh)。 一个有趣的事实是,pumpify 在内部使用了 Pump (nodejsdp.link/pump),这是一个在 Node.js pipeline() 帮助器之前诞生的模块。 Pump 实际上是激发 pipeline() 开发的模块。 如果您比较它们的源代码,您会发现,毫不奇怪,这两个模块有很多共同点。

实现组合流

为了说明组合流的简单示例,让我们考虑以下两个转换流的情况:

  • 一个既压缩又加密数据

  • 一个既解密又解压缩数据

使用 Pumpify 等库,我们可以轻松构建这些流 通过组合我们已经从核心库中获得的一些流来实现流(在名为 combined-streams.js 的文件中):

import { createGzip, createGunzip } from 'zlib'
import {
    createCipheriv,
    createDecipheriv,
    scryptSync
} from 'crypto'
import pumpify from 'pumpify'
function createKey (password) {
    return scryptSync(password, 'salt', 24)
}

export function createCompressAndEncrypt (password, iv) {
    const key = createKey(password)
    const combinedStream = pumpify(
        createGzip(),
        createCipheriv('aes192', key, iv)
    )
    combinedStream.iv = iv
    return combinedStream
}

export function createDecryptAndDecompress (password, iv) {
    const key = createKey(password)
    return pumpify(
        createDecipheriv('aes192', key, iv),
        createGunzip()
    )
}

例如,我们现在可以像使用黑匣子一样使用这些组合流来创建一个小型应用程序,通过压缩和加密来归档文件。 让我们在名为 archive.js 的新模块中执行此操作:

import { createReadStream, createWriteStream } from 'fs'
import { pipeline } from 'stream'
import { randomBytes } from 'crypto'
import { createCompressAndEncrypt } from './combined-streams.js'

const [,, password, source] = process.argv
const iv = randomBytes(16)
const destination = `${source}.gz.enc`

pipeline(
    createReadStream(source),
    createCompressAndEncrypt(password, iv),
    createWriteStream(destination),
    (err) => {
        if (err) {
            console.error(err)
            process.exit(1)
        }
        console.log(`${destination} created with iv: ${iv.
        toString('hex')}`)
    }
)

请注意,我们不必担心 archiveFile 中有多少步骤。 事实上,我们只是将其视为当前管道中的单个流。 这使得我们的组合流可以轻松地在其他上下文中重用。

现在,要运行存档模块,只需在命令行参数中指定密码和文件:

node archive.js mypassword /path/to/a/file.txt

此命令将创建一个名为 /path/to/a/file.txt.gz.enc 的文件,并将生成的初始化向量打印到控制台。

现在,作为练习,您可以使用 createDecryptAndDecompress() 函数创建一个类似的脚本,该脚本采用密码、初始化向量和存档文件并将其取消存档。

在实际应用中,通常最好将初始化向量作为加密数据的一部分包含在内,而不是要求用户传递它。 实现此目的的一种方法是让存档流发出的前 16 个字节代表初始化向量。 取消归档实用程序需要进行相应更新,以在开始以流方式处理数据之前消耗前 16 个字节。 这种方法会增加一些额外的复杂性,这超出了本示例的范围,因此我们选择了更简单的解决方案。 一旦您对流感到满意,我们鼓励您尝试实现一个解决方案作为练习,其中初始化向量不必由用户传递。

通过这个例子,我们清楚地展示了组合流的重要性。 一方面,它允许我们创建可重用的流组合,另一方面,它简化了管道的错误管理。

分叉流

我们可以通过将单个可读流管道传输到多个可写流来执行流的分叉。 当我们想要将相同的数据发送到不同的目的地时,这很有用; 例如,两个不同的套接字或两个不同的文件。 当我们想要对同一数据执行不同的转换,或者当我们想要根据某些标准分割数据时,也可以使用它。 如果您熟悉 Unix 命令 tee (nodejsdp.link/tee),那么这与应用于 Node.js 流的概念完全相同!

图 6.7 为我们提供了该模式的图形表示:

image 2024 05 07 10 29 03 757
Figure 2. 图 6.7:分叉流

在 Node.js 中分叉流非常容易,但有一些注意事项需要记住。 让我们首先通过一个例子来讨论这个模式。 一旦我们手头有一个例子,就会更容易理解这种模式的注意事项。

实现多重校验和生成器

让我们创建一个小实用程序,输出给定文件的 sha1 和 md5 哈希值。 我们将这个新模块称为 generate-hashes.js:

import { createReadStream, createWriteStream } from 'fs'
import { createHash } from 'crypto'

const filename = process.argv[2]
const sha1Stream = createHash('sha1').setEncoding('hex')
const md5Stream = createHash('md5').setEncoding('hex')
const inputStream = createReadStream(filename)

inputStream
    .pipe(sha1Stream)
    .pipe(createWriteStream(`${filename}.sha1`))
inputStream
    .pipe(md5Stream)
    .pipe(createWriteStream(`${filename}.md5`))

很简单,对吧? inputStream 变量通过管道传输到一侧的 sha1Stream 和另一侧的 md5Stream。 幕后发生的一些事情需要注意:

  • md5Stream 和 sha1Stream 都将在 inputStream 结束时自动结束,除非我们在调用 pipeline() 时指定 { end: false } 作为选项。

  • 流的两个分支将接收相同的数据块,因此在对数据执行副作用操作时必须非常小心,因为这会影响我们向其发送数据的每个流。

  • 背压开箱即用; 来自 inputStream 的流将与分支中最慢的分支一样快。 换句话说,如果一个目的地长时间暂停源流以处理背压,则所有其他目的地也将等待。 此外,一个目标无限期地阻塞将会阻塞整个管道!

  • 如果我们在开始使用源数据(异步管道)后通过管道传输到其他流,则新流将仅接收新的数据块。 在这些情况下,我们可以使用 PassThrough 实例作为占位符来收集从开始使用流那一刻起的所有数据。 然后,可以在将来的任何时间读取 PassThrough 流,而不会丢失任何数据的风险。 请注意,如上一点所述,这种方法可能会产生背压并阻塞整个管道。

合并流

合并是与分叉相反的操作,涉及将一组可读流通过管道传输到单个可写流中,如图 6.8 所示:

image 2024 05 07 10 38 45 421
Figure 3. 图 6.8:合并流

一般来说,将多个流合并为一个流是一项简单的操作; 但是,我们必须注意处理结束事件的方式,因为使用默认选项(其中 { end: true })的管道会导致目标流在源之一结束后立即结束。 这通常会导致错误,因为其他活动源继续写入已终止的流。

此问题的解决方案是在将多个源管道传输到单个目标时使用选项 { end: false } ,然后仅当所有源都完成读取时才在目标上调用 end() 。

合并文本文件

举一个简单的例子,让我们实现一个小程序,它采用输出路径和任意数量的文本文件,然后将每个文件的行合并到目标文件中。 我们的新模块将被称为 merge-lines.js。 让我们从一些初始化步骤开始定义它的内容:

import { createReadStream, createWriteStream } from 'fs'
import split from 'split'

const dest = process.argv[2]
const sources = process.argv.slice(3)

在前面的代码中,我们只是加载所有依赖项并初始化包含目标 (dest) 文件名称和所有源文件 (source) 的变量。

接下来,我们将创建目标流:

const destStream = createWriteStream(dest)

现在,是时候初始化源流了:

let endCount = 0
for (const source of sources) {
    const sourceStream = createReadStream(source, { highWaterMark: 16 })
    sourceStream.on('end', () => {
        if (++endCount === sources.length) {
            destStream.end()
            console.log(`${dest} created`)
        }
    })
    sourceStream
        .pipe(split((line) => line + '\n'))
        .pipe(destStream, { end: false })
}

在前面的代码中,我们为每个源文件创建了一个可读流。 然后,对于每个源流,我们附加一个结束侦听器,只有当所有文件都已完全读取时,该侦听器才会终止目标流。 最后,我们将每个源流通过管道传输到 split(),这是一个 Transform 流,确保我们为每一行文本生成一个块,最后,我们将结果通过管道传输到目标流。 这是真正的合并发生的时候。 我们将多个源流传输到一个目的地。

我们现在可以使用以下命令执行此代码:

node merge-lines.js <destination> <source1> <source2> <source3> ...

如果您使用足够大的文件运行此代码,您会注意到目标文件将包含从所有源文件中随机混合的行(16 字节的低 highWaterMark 使此属性更加明显)。 这种行为在某些类型的对象流和某些按行分割的文本流(如我们当前的示例中)中是可以接受的,但在处理大多数二进制流时通常是不可取的。

该模式的一种变体允许我们按顺序合并流; 它包括一个接一个地使用源流。 当前一个结束时,下一个开始发出块(这就像连接所有源的输出)。 与往常一样,在 npm 上,我们可以找到一些也可以处理这种情况的包。 其中之一是多流( https://npmjs.org/package/multistream )。

复用和解复用

合并流模式有一个特殊的变体,在该变体中,我们实际上并不希望将多个流连接在一起,而是使用共享通道来传递一组流的数据。 这是概念上不同的操作,因为源流在共享通道内保持逻辑分离,这允许我们在数据到达共享通道的另一端时再次分割流。 图 6.9 阐明了这种情况:

image 2024 05 07 11 06 19 946
Figure 4. 图 6.9:复用和解复用流

组合多个流(在这种情况下,也称为通道)以允许通过单个流进行传输的操作称为复用,而相反的操作,即根据从共享流接收的数据重建原始流,称为解复用。 执行这些操作的设备分别称为复用器(或 mux)和解复用器(或 demux)。 这是计算机科学和电信领域广泛研究的领域,因为它是几乎所有类型的通信媒体(例如电话、广播、电视,当然还有互联网本身)的基础之一。 对于本书的范围,我们不会进行过多的解释,因为这是一个很大的主题。

我们想要在本节中演示的是如何使用共享 Node.js 流来传输多个逻辑上分离的流,然后这些流在共享流的另一端再次分离。

构建远程记录器

让我们用一个例子来推动我们的讨论。 我们需要一个小程序来启动子进程并将其标准输出和标准错误重定向到远程服务器,而远程服务器又将两个流保存在两个单独的文件中。 因此,在这种情况下,共享介质是 TCP 连接,而要复用的两个通道是子进程的 stdout 和 stderr。 我们将利用一种称为数据包交换的技术,该技术与 IP、TCP 和 UDP 等协议使用的技术相同。 数据包交换涉及将数据包装成数据包,使我们能够指定对多路复用、路由、控制流、检查损坏数据等有用的各种元信息。 我们在示例中实现的协议非常简约。 我们将数据包装成简单的数据包,如图 6.10 所示:

image 2024 05 07 11 07 20 461
Figure 5. 图 6.10:远程记录器数据包的字节结构

如图 6.10 所示,数据包包含实际数据,但也包含标头(通道 ID + 数据长度),这使得区分每个流的数据并使解复用器能够将数据包路由到正确的通道。

客户端——多路复用

让我们开始从客户端构建我们的应用程序。 凭借大量的创造力,我们将模块命名为 client.js。 这代表应用程序中负责启动子进程并复用其流的部分。

那么,让我们从定义模块开始。 首先,我们需要一些依赖项:

import { fork } from 'child_process'
import { connect } from 'net'

现在,让我们实现一个执行源列表多路复用的函数:

function multiplexChannels (sources, destination) {
    let openChannels = sources.length
    for (let i = 0; i < sources.length; i++) {
        sources[i]
            .on('readable', function () { // (1)
                let chunk
                while ((chunk = this.read()) !== null) {
                    const outBuff = Buffer.alloc(1 + 4 + chunk.length) // (2)
                    outBuff.writeUInt8(i, 0)
                    outBuff.writeUInt32BE(chunk.length, 1)
                    chunk.copy(outBuff, 5)
                    console.log(`Sending packet to channel: ${i}`)
                    destination.write(outBuff) // (3)
                }
            })
            .on('end', () => { // (4)
                if (--openChannels === 0) {
                    destination.end()
                }
            })
    }
}

MultiplexChannels() 函数接收要多路复用的源流和目标通道作为输入,然后执行以下步骤:

  1. 对于每个源流,它为可读事件注册一个侦听器,我们在其中读取 使用非流动模式从流中获取数据。

  2. 当读取一个块时,我们将其包装成一个数据包,其中按顺序包含 1 个字节 (UInt8) 表示通道 ID、4 个字节 (UInt32BE) 表示数据包大小,然后是实际数据。

  3. 当数据包准备好后,我们将其写入目标流。

  4. 最后,我们为结束事件注册一个监听器,以便我们可以在所有源流结束时终止目标流。

我们的协议能够复用多达 256 个不同的源流,因为我们只有 1 个字节来标识通道。

现在,我们客户端的最后一部分变得非常简单:

const socket = connect(3000, () => { // (1)
    const child = fork( // (2)
        process.argv[2],
        process.argv.slice(3),
        { silent: true })
    multiplexChannels([child.stdout, child.stderr], socket) // (3)
}

在最后一个代码片段中,我们执行以下操作:

  1. 我们创建一个到地址 localhost:3000 的新 TCP 客户端连接。

  2. 我们使用第一个命令行参数作为路径来启动子进程,同时提供 process.argv 数组的其余部分作为子进程的参数。 我们指定选项 {silent: true},这样子进程就不会继承父进程的 stdout 和 stderr。

  3. 最后,我们获取子进程的 stdout 和 stderr,并使用 mutiplexChannels() 函数将它们多路复用到套接字的可写流中。

服务器端——解复用

现在,我们可以负责创建应用程序的服务器端 (server.js),在其中我们对来自远程连接的流进行多路分离,并将它们通过管道传输到两个不同的文件中。

让我们首先创建一个名为 demultiplexChannel() 的函数:

import { createWriteStream } from 'fs'
import { createServer } from 'net'

function demultiplexChannel (source, destinations) {
    let currentChannel = null
    let currentLength = null

    source
        .on('readable', () => { // (1)
            let chunk
            if (currentChannel === null) { // (2)
                chunk = source.read(1)
                currentChannel = chunk && chunk.readUInt8(0)
            }

            if (currentLength === null) { // (3)
                chunk = source.read(4)
                currentLength = chunk && chunk.readUInt32BE(0)
                if (currentLength === null) {
                    return null
                }
            }

            chunk = source.read(currentLength) // (4)
            if (chunk === null) {
                return null
            }

            console.log(`Received packet from: ${currentChannel}`)
            destinations[currentChannel].write(chunk) // (5)
            currentChannel = null
            currentLength = null
        })
        .on('end', () => { // (6)
            destinations.forEach(destination => destination.end())
            console.log('Source channel closed')
        })
}

前面的代码可能看起来很复杂,但事实并非如此。 借助 Node.js Readable Streams 的特性,我们可以轻松实现小协议的解复用,如下所示:

  1. 我们开始使用非流模式从流中读取数据。

  2. 首先,如果我们还没有读取通道ID,我们尝试从流中读取1个字节,然后将其转换为数字。

  3. 下一步是读取数据的长度。 为此,我们需要 4 个字节,因此内部缓冲区中可能没有足够的数据(即使不太可能),这将导致 this.read() 调用返回 null。 在这种情况下,我们只需中断解析并在下一个可读事件时重试。

  4. 当我们最终也可以读取数据大小时,我们知道要从内部缓冲区中提取多少数据,因此我们尝试全部读取。

  5. 当我们读取所有数据时,我们可以将其写入正确的目标通道,确保我们重置了 currentChannel 和 currentLength 变量(这些将用于解析下一个数据包)。

  6. 最后,我们确保当源通道结束时也结束所有目标通道。

现在我们可以对源流进行多路分解,让我们的新函数发挥作用:

const server = createServer((socket) => {
    const stdoutStream = createWriteStream('stdout.log')
    const stderrStream = createWriteStream('stderr.log')
    demultiplexChannel(socket, [stdoutStream, stderrStream])
})
server.listen(3000, () => console.log('Server started'))

在上面的代码中,我们首先在端口3000上启动一个TCP服务器; 然后,对于我们收到的每个连接,我们创建两个指向两个不同文件的可写流:一个用于标准输出,另一个用于标准错误。 这些是我们的目的地渠道。 最后,我们使用 de MultiplexChannel() 将套接字流解复用为 stdoutStream 和 stderrStream。

运行复用器/解复用器应用程序

现在,我们准备尝试新的 mux/demux 应用程序,但首先,让我们创建一个小型 Node.js 程序来生成一些示例输出; 我们称之为 generate-data.js:

console.log('out1')
console.log('out2')
console.error('err1')
console.log('out3')
console.error('err2')

好的; 现在,我们准备尝试我们的远程日志记录应用程序。 首先,我们启动服务器:

node server.js

然后,我们将通过提供要作为子进程启动的文件来启动客户端:

node client.js generateData.js

客户端几乎会立即运行,但在该过程结束时,generate-data.js 应用程序的标准输入和标准输出将通过一个 TCP 连接传输,然后在服务器上被多路分解为两个单独的文件 。

请注意,由于我们使用 child_process.fork() (nodejsdp.link/fork),我们的客户端将只能启动其他 Node.js 模块。

多路复用和多路分解对象流

我们刚刚展示的示例演示了如何多路复用和多路分解二进制/文本流,但值得一提的是,相同的规则也适用于对象流。 最大的区别是,当使用对象时,我们已经有了一种使用原子消息(对象)传输数据的方法,因此多路复用就像在每个对象中设置 channelID 属性一样简单。 多路分解只涉及读取 channelID 属性并将每个对象路由到正确的目标流。

仅涉及解复用的另一种模式是根据某些条件路由来自源的数据。 通过这种模式,我们可以实现复杂的流程,如图 6.11 所示:

image 2024 05 07 11 17 07 293
Figure 6. 图 6.11:解复用对象流

图 6.11 中的系统中使用的多路分配器获取代表动物的对象流,并根据动物的类别将它们每个分配到正确的目标流:爬行动物、两栖动物或哺乳动物。

使用相同的原理,我们也可以为流实现 if…​else 语句。 为了获得一些灵感,请查看 ternary-stream 包 (nodejsdp.link/ternary-stream),它允许我们做到这一点。