流入门

在上一节中,我们了解了流如此强大的原因,同时还了解到从 Node.js 的核心模块开始,流在 Node.js 中无处不在。例如,我们看到 fs 模块有用于读取文件的 createReadStream() 和用于写入文件的 createWriteStream(),HTTP 请求和响应对象本质上就是流,zlib 模块允许我们使用流接口压缩和解压数据,最后,甚至 crypto 模块也公开了一些有用的流原语,如 createCipheriv 和 createDecipheriv。

现在我们知道为什么流如此重要,让我们退后一步,开始更详细地探讨它们。

流的剖析

Node.js 中的每个流都是流核心模块中可用的四个基本抽象类之一的实现:

  • Readable

  • Writable

  • Duplex

  • Transform

每个流类也是 EventEmitter 的一个实例。 事实上,流可以产生多种类型的事件,例如当可读流完成读取时结束,当可写流完成写入时结束,或者当出现问题时出错。

流如此灵活的原因之一是它们不仅可以处理二进制数据,还可以处理几乎任何 JavaScript 值。 事实上,它们支持两种操作模式:

  • 二进制模式:以块的形式传输数据,例如缓冲区或字符串

  • 对象模式:以离散对象序列的形式传输数据(允许我们使用几乎任何 JavaScript 值)

这两种操作模式使我们不仅可以将流用于 I/O,而且还可以将其作为一种工具,以函数式方式优雅地组合处理单元,正如我们将在本章后面看到的那样。

让我们通过介绍 Readable 流类开始深入研究 Node.js 流。

可读流

可读流代表数据源。 在 Node.js 中,它是使用 Readable 抽象类实现的,该类在 Stream 模块中可用。

从流中读取

有两种方法可以从可读流接收数据:非流动(或暂停)和流动。 让我们更详细地分析这些模式。

暂停模式

非流动或暂停模式是从可读流读取的默认模式。 它涉及将一个侦听器附加到可读事件的流,该事件表示可以读取新数据的可用性。 然后,在一个循环中,我们不断地读取数据,直到内部缓冲区被清空。 这可以使用 read() 方法来完成,该方法同步从内部缓冲区读取并返回表示数据块的 Buffer 对象。 read() 方法具有以下签名:

readable.read([size])

使用这种方法,可以根据需要从流中强制提取数据。

为了展示它是如何工作的,让我们创建一个名为 read-stdin.js 的新模块,它实现一个简单的程序,从标准输入(也是可读流)读取并将所有内容回显到标准输出:

process.stdin
    .on('readable', () => {
        let chunk
        console.log('New data available')
        while ((chunk = process.stdin.read()) !== null) {
            console.log(
                `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
            )
        }})
    .on('end', () => console.log('End of stream'))

read() 方法是一个同步操作,它从可读流的内部缓冲区中提取数据块。 如果流在二进制模式下工作,则默认情况下返回的块是 Buffer 对象。

在以二进制模式工作的可读流中,我们可以通过在流上调用 setEncoding(encoding) 并提供有效的编码格式(例如 utf8)来读取字符串而不是缓冲区。 当流式传输 UTF-8 文本数据时,建议使用此方法,因为流将正确处理多字节字符,并进行必要的缓冲以确保没有字符最终被分割成单独的块。 换句话说,流生成的每个块都将是有效的 UTF-8 字节序列。

请注意,即使您已经开始使用流中的数据,您也可以在 Readable 流上根据需要多次调用 setEncoding() 。 编码将在下一个可用块上动态切换。 流本质上是二进制的; 编码只是流发出的二进制数据的视图。

数据仅从 Readable 侦听器中读取,一旦新数据可用,就会调用该侦听器。 当内部缓冲区中没有更多可用数据时,read() 方法返回 null; 在这种情况下,我们必须等待另一个可读事件被触发,告诉我们可以再次读取,或者等待表明流结束的结束事件。 当流以二进制模式工作时,我们还可以通过将大小值传递给 read() 方法来指定我们有兴趣读取特定数量的数据。 这在实现网络协议或解析特定数据格式时特别有用。

现在,我们准备运行 read-stdin.js 模块并对其进行实验。 让我们在控制台中输入一些字符,然后按 Enter 键以查看回显到标准输出中的数据。 为了终止流并生成一个优雅的结束事件,我们需要插入一个 EOF(文件结束)字符(在 Windows 上使用 Ctrl + Z,在 Linux 和 macOS 上使用 Ctrl + D)。

我们还可以尝试将我们的程序与其他进程连接起来。 这可以使用管道运算符 (|) 来实现,它将一个程序的标准输出重定向到另一个程序的标准输入。 例如,我们可以运行如下命令:

cat <path to a file> | node read-stdin.js

这是一个令人惊叹的演示,展示了流范例如何成为一个通用接口,使我们的程序能够进行通信,无论它们是用什么语言编写的。

流动模式

从流中读取数据的另一种方法是将侦听器附加到 data 事件。 这会将流切换到使用流动模式,其中数据不是使用 read() 提取的,而是在数据到达时立即推送到数据侦听器。 例如,我们之前创建的 read-stdin.js 应用程序在使用流动模式时将如下所示:

process.stdin
    .on('data', (chunk) => {
        console.log('New data available')
        console.log(
            `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
        )
    })
    .on('end', () => console.log('End of stream'))

与非流动模式相比,流动模式在控制数据流方面的灵活性较低。 流的默认操作模式是非流动的,因此要启用流动模式,需要将侦听器附加到 data 事件或显式调用 resume() 方法。 要暂时停止流发出数据事件,我们可以调用 pause() 方法,使所有传入数据缓存在内部缓冲区中。 调用 pause() 会将流切换回非流动模式。

异步迭代

可读流也是异步迭代器; 因此,我们可以重写 read-stdin.js 示例,如下所示:

async function main () {
    for await (const chunk of process.stdin) {
        console.log('New data available')
        console.log(
            `Chunk read (${chunk.length} bytes): "${chunk.toString()}"`
        )
    }
    console.log('End of stream')
}

main()

我们将在第 9 章 “行为设计模式” 中更详细地讨论异步迭代器,因此现在不必太担心前面示例中的语法。 重要的是要知道,如果您需要编写一个消耗整个 Readable 流并返回 Promise 的函数,那么这种语法可能会非常方便。

实现可读流

现在我们知道如何从流中读取数据,下一步是学习如何实现新的自定义可读流。 为此,需要通过从流模块继承原型 Readable 来创建一个新类。 具体流必须提供 _read() 方法的实现,该方法具有以下签名:

readable._read(size)

Readable 类的内部将调用 _read() 方法,该方法又将开始使用 push() 填充内部缓冲区:

readable.push(chunk)

请注意,read() 是由流消费者调用的方法,而 _read() 是由流子类实现的方法,不应该直接调用。 下划线通常表示该方法不是公共的,不应直接调用。

为了演示如何实现新的可读流,我们可以尝试实现一个生成随机字符串的流。 让我们创建一个名为 random-stream.js 的新模块,其中包含随机字符串生成器的代码:

import { Readable } from 'stream'
import Chance from 'chance'

const chance = new Chance()

export class RandomStream extends Readable {
    constructor (options) {
        super(options)
        this.emittedBytes = 0
    }

    _read (size) {
        const chunk = chance.string({ length: size }) // (1)
        this.push(chunk, 'utf8') // (2)
        this.emittedBytes += chunk.length
        if (chance.bool({ likelihood: 5 })) { // (3)
            this.push(null)
        }
    }
}

在文件的顶部,我们加载依赖项。 那里没有什么特别的,除了我们正在加载一个名为 opportunity (nodejsdp.link/chance) 的 npm 模块,它是一个用于生成各种随机值的库,从数字到字符串再到整个句子。

下一步是创建一个名为 RandomStream 的新类,它将 Readable 指定为其父类。 在前面的代码中,在 RandomStream 构造函数中调用 super(options) 将调用父类的构造函数,从而允许我们初始化流的内部状态。

如果您有一个仅调用 super(options) 的构造函数,则可以将其删除,因为您将继承父构造函数。 只是要小心,记住每次需要编写自定义构造函数时都要调用 super(options)。

可以通过选项(options)对象传递的可能参数包括以下内容:

  • 编码(encoding)参数,用于将缓冲区转换为字符串(默认为 null)

  • 启用对象模式的标志(objectMode,默认为 false)

  • 内部缓冲区中存储数据的上限,超过此上限后,将不再从源代码中读取数据(高水位,highWaterMark,默认为 16KB)。

好的,现在我们来解释一下 _read() 方法:

  1. 该方法利用 chance 生成一个长度等于 size 的随机字符串。

  2. 将字符串推入内部缓冲区。 请注意,由于我们正在推送字符串,因此我们还需要指定编码 utf8(如果块只是一个二进制缓冲区,则没有必要)。

  3. 它通过向内部缓冲区推送 null 值来随机终止数据流(可能性为 5%),以表示 EOF 情况,或者换句话说,表示数据流的结束。

请注意,_read() 函数中的 size 参数是一个建议参数。 最好遵守它并仅推送调用者请求的数据量,尽管这不是强制性的。

当我们调用 push() 时,我们应该检查它是否返回 false。 当发生这种情况时,意味着接收流的内部缓冲区已达到 highWaterMark 限制,我们应该停止向其中添加更多数据。 这称为背压,我们将在本章的下一节中更详细地讨论它。

RandomStream 就这样了,我们现在可以使用它了。 让我们看看如何实例化 RandomStream 对象并从中提取一些数据:

// index.js
import { RandomStream } from './random-stream.js'

const randomStream = new RandomStream()
randomStream
    .on('data', (chunk) => {
        console.log(`Chunk received (${chunk.length} bytes): ${chunk.toString()}`)
    })
    .on('end', () => {
        console.log(`Produced ${randomStream.emittedBytes} bytes of random data`)
    })

现在,一切准备就绪,我们可以尝试新的自定义流了。 只需像往常一样执行 index.js 模块,然后观察屏幕上流动的一组随机字符串。

简化构造器方法

对于简单的自定义流,我们可以通过使用 Readable 流的简化构造方法来避免创建自定义类。 使用这种方法,我们只需要调用 new Readable(options) 并在选项集中传递一个名为 read() 的方法。 这里的 read() 方法与我们在类扩展方法中看到的 _read() 方法具有完全相同的语义。 让我们使用简化的构造函数方法重写 RandomStream:

import { Readable } from 'stream'
import Chance from 'chance'

const chance = new Chance()
let emittedBytes = 0

const randomStream = new Readable({
    read (size) {
        const chunk = chance.string({ length: size })
        this.push(chunk, 'utf8')
        emittedBytes += chunk.length
        if (chance.bool({ likelihood: 5 })) {
            this.push(null)
        }
    }
})

// now use randomStream instance directly ...

当你不需要管理复杂的状态时,这种方法会特别有用,而且可以利用更简洁的语法。在前面的示例中,我们创建了自定义流的单个实例。如果我们想采用简化的构造函数方法,但又需要创建自定义流的多个实例,我们可以将初始化逻辑封装在一个工厂函数中,然后多次调用该函数来创建这些实例。

可迭代的可读流

您可以使用 Readable.from() 帮助器轻松地从数组或其他可迭代对象(即生成器、迭代器和异步迭代器)创建 Readable 流实例。

为了熟悉这个助手,让我们看一个简单的示例,其中我们将数据从数组转换为可读流:

import { Readable } from 'stream'

const mountains = [
    { name: 'Everest', height: 8848 },
    { name: 'K2', height: 8611 },
    { name: 'Kangchenjunga', height: 8586 },
    { name: 'Lhotse', height: 8516 },
    { name: 'Makalu', height: 8481 }
]

const mountainsStream = Readable.from(mountains)
mountainsStream.on('data', (mountain) => {
    console.log(`${mountain.name.padStart(14)}\t${mountain.height}m`)
})

正如我们从这段代码中看到的,Readable.from() 方法使用起来非常简单:第一个参数是一个可迭代实例(在我们的例子中是 mountain 数组)。 Readable.from() 接受一个附加的可选参数,可用于指定流选项,例如 objectMode。

请注意,我们不必显式地将 objectMode 设置为 true。 默认情况下,Readable.from() 会将 objectMode 设置为 true,除非通过将其设置为 false 来明确选择退出。 流选项可以作为第二个参数传递给函数。

运行前面的代码将产生以下输出:

Everest        8848m
K2             8611m
Kangchenjunga  8586m
Lhotse         8516m
Makalu         8481m

尽量不要在内存中实例化大型数组。想象一下,在前面的例子中,我们想列出世界上所有的山脉。世界上大约有 100 万座山,如果我们将所有的山都装入一个数组,就会分配大量内存。即使我们随后通过可读取数据流读取数组中的数据,所有数据也已经预先加载,因此我们实际上是在浪费数据流的内存效率。我们可以使用本地流(如 fs.createReadStream)、创建自定义流,或者使用 Readable.from 和懒惰迭代器(如生成器、迭代器或异步迭代器)来实现这一点。我们将在第 9 章 “行为设计模式” 中看到后一种方法的一些示例。

可写流

可写流代表数据目的地。 例如,想象一下文件系统上的文件、数据库表、套接字、标准错误或标准输出接口。 在 Node.js 中,它是使用 Writable 抽象类实现的,该类在 Stream 模块中可用。

写入流

将一些数据推送到可写流是一件简单的事情; 我们所要做的就是使用 write() 方法,该方法具有以下签名:

writable.write(chunk, [encoding], [callback])

编码参数是可选的,如果 chunk 是字符串,则可以指定(默认为 utf8,如果 chunk 是缓冲区,则忽略)。 另一方面,回调函数在块刷新到底层资源时被调用,并且也是可选的。

为了表明不再有数据写入流中,我们必须使用 end() 方法:

writable.end([chunk], [encoding], [callback])

我们可以通过 end() 方法提供最终的数据块; 在这种情况下,回调函数相当于向 finish 事件注册一个侦听器,当流中写入的所有数据都已刷新到底层资源时会触发该侦听器。

现在,让我们通过创建一个输出随机字符串序列的小型 HTTP 服务器来展示其工作原理:

// entropy-server.js
import { createServer } from 'http'
import Chance from 'chance'

const chance = new Chance()
const server = createServer((req, res) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' }) // (1)
    while (chance.bool({ likelihood: 95 })) { // (2)
        res.write(`${chance.string()}\n`) // (3)
    }
    res.end('\n\n') // (4)
    res.on('finish', () => console.log('All data sent')) // (5)
})
server.listen(8080, () => {
    console.log('listening on http://localhost:8080')
})

我们创建的 HTTP 服务器写入 res 对象,该对象是 http.ServerResponse 的实例,也是一个可写流。 发生的情况解释如下:

  1. 我们首先写入 HTTP 响应的头部。 请注意,writeHead() 不是 Writable 接口的一部分; 事实上,它是 http.ServerResponse 类公开的一个辅助方法,特定于 HTTP 协议。

  2. 我们启动一个以 5% 的可能性终止的循环(我们指示 opportunity.bool() 在 95% 的情况下返回 true)。

  3. 在循环内,我们将随机字符串写入流中。

  4. 一旦退出循环,我们就在流上调用 end(),表示不会再写入任何数据。 此外,我们还提供了一个包含两个换行符的最终字符串,在结束流之前将其写入流中。

  5. 最后,我们为 finish 事件注册一个侦听器,当所有数据都刷新到底层套接字时,该侦听器将被触发。

要测试服务器,我们可以在地址 http://localhost:8080 打开浏览器或从终端使用 curl,如下所示:

curl localhost:8080

此时,服务器应开始向您选择的 HTTP 客户端发送随机字符串(请注意,某些浏览器可能会对数据进行缓冲,因此流式行为可能并不明显)。

背压

与真实管道系统中流动的液体类似,Node.js 流也可能遇到瓶颈,即数据写入速度快于流消耗数据的速度。 处理这个问题的机制涉及缓冲传入的数据; 但是,如果流不向输入器提供任何反馈,我们可能会遇到越来越多的数据积累在内部缓冲区中的情况,从而导致不期望的内存使用水平。

为了防止这种情况发生,当内部缓冲区超过 highWaterMark 限制时,writable.write() 将返回 false。 在可写流中,highWaterMark 属性是内部缓冲区大小的限制,超出该限制 write() 方法开始返回 false,指示应用程序现在应该停止写入。 当缓冲区清空时,会发出 drain 事件,表明可以安全地再次开始写入。 这种机制称为背压。

背压是一种咨询机制。 即使 write() 返回 false,我们也可以忽略该信号并继续写入,从而使缓冲区无限增长。 当达到 highWaterMark 阈值时,流不会自动阻塞; 因此,建议始终保持警惕并遵循背压。

本节中描述的机制同样适用于可读流。 事实上,Readable 流中也存在背压,当 _read() 内部调用的 push() 方法返回 false 时就会触发背压。 然而,这是流实现者特有的问题,因此我们通常不必频繁地处理它。

我们可以通过修改我们之前创建的 entropy-server.js 模块来快速演示如何考虑可写流的背压:

// ...
const server = createServer((req, res) => {
    res.writeHead(200, { 'Content-Type': 'text/plain' })
    function generateMore () { // (1)
        while (chance.bool({ likelihood: 95 })) {
            const randomChunk = chance.string({ // (2)
                length: (16 * 1024) - 1
            })
            const shouldContinue = res.write(`${randomChunk}\n`) // (3)
            if (!shouldContinue) {
                console.log('back-pressure')
                return res.once('drain', generateMore)
            }
        }
        res.end('\n\n')
    }
    generateMore()
    res.on('finish', () => console.log('All data sent'))
})
// ...

前面代码中最重要的步骤可以总结如下:

  1. 我们将主要逻辑包装在一个名为 generateMore() 的函数中。

  2. 为了增加接收一些背压的机会,我们将数据块的大小增加到 16 KB 减去 1 字节,这非常接近默认的 highWaterMark 限制。

  3. 写入一块数据后,我们检查 res.write() 的返回值。 如果我们收到 false,则意味着内部缓冲区已满,我们应该停止发送更多数据。 发生这种情况时,我们退出该函数,并使用 generateMore() 注册另一个写入周期,以便在发出 drain 事件时使用。

如果我们现在尝试再次运行服务器,然后使用 curl 生成客户端请求,则很可能会出现一些背压,因为服务器以非常高的速率生成数据,比底层套接字可以处理的速度更快。

实现可写流

我们可以通过继承 Writable 类并提供 _write() 方法的实现来实现一个新的 Writable 流。 让我们尝试立即执行,同时讨论细节。

让我们构建一个可写流来接收以下格式的对象:

{
    path: <path to a file>
    content: <string or buffer>
}

对于这些对象中的每一个,我们的流都必须将内容属性保存到在给定路径创建的文件中。 我们可以立即看到流的输入是对象,而不是字符串或缓冲区。 这意味着我们的流必须在对象模式下工作。

我们将该模块称为 to-file-stream.js:

import { Writable } from 'stream'
import { promises as fs } from 'fs'
import { dirname } from 'path'
import mkdirp from 'mkdirp-promise'

export class ToFileStream extends Writable {
    constructor (options) {
        super({ ...options, objectMode: true })
    }

    _write (chunk, encoding, cb) {
        mkdirp(dirname(chunk.path))
            .then(() => fs.writeFile(chunk.path, chunk.content))
            .then(() => cb())
            .catch(cb)
    }
}

我们为新流创建了一个新类,它扩展了流模块中的 Writable。

我们必须调用父构造函数来初始化其内部状态; 我们还需要确保选项(options)对象指定流在对象模式下工作(objectMode: true)。 Writable 接受的其他选项如下:

  • highWaterMark(默认为 16 KB):这控制背压限制。

  • decodeStrings(默认为 true):这可以在将字符串传递给 _write() 方法之前自动将其解码到二进制缓冲区中。 在对象模式下该选项被忽略。

最后,我们提供了 _write() 方法的实现。 正如您所看到的,该方法接受一个数据块和一个编码(只有当我们处于二进制模式并且流选项 decodeStrings 设置为 false 时才有意义)。 此外,该方法接受一个回调函数(cb),该函数需要在操作完成时调用; 没有必要传递操作结果,但如果需要,我们仍然可以传递一个错误,该错误将导致流发出 error 事件。

现在,为了尝试我们刚刚构建的流,我们可以创建一个新模块并对流执行一些写入操作:

import { join } from 'path'
import { ToFileStream } from './to-file-stream.js'
const tfs = new ToFileStream()

tfs.write({
    path: join('files', 'file1.txt'), content: 'Hello' })
tfs.write({
    path: join('files', 'file2.txt'), content: 'Node.js' })
tfs.write({
    path: join('files', 'file3.txt'), content: 'streams' })
tfs.end(() => console.log('All files created'))

在这里,我们创建并使用了第一个自定义可写流。 像往常一样运行新模块并检查其输出。 您将看到执行后,将在名为 files 的新文件夹中创建三个新文件。

简单构造器方法

正如我们在可读流中看到的那样,可写流还提供了简化的构造机制。 如果我们使用可写流的简化构造来重写 ToFileStream,它将如下所示:

// ...
const tfs = new Writable({
    objectMode: true,
    write (chunk, encoding, cb) {
        mkdirp(dirname(chunk.path))
            .then(() => fs.writeFile(chunk.path, chunk.content))
            .then(() => cb())
            .catch(cb)
    }
})
// ...

通过这种方法,我们只需使用 Writable 构造函数并传递一个 write() 函数来实现 Writable 实例的自定义逻辑。 请注意,使用这种方法时, write() 函数的名称中没有下划线。 我们还可以传递其他构造选项,例如 objectMode。

双工流

双工流是既可读又可写的流。 当我们想要描述既是数据源又是数据目的地的实体(例如网络套接字)时,它很有用。 双工流继承两个流的方法。 可读且可流写,所以这对我们来说并不新鲜。 这意味着我们可以 read() 或 write() 数据,或者监听 read 和 dry 事件。

要创建自定义双工流,我们必须为 _read() 和 _write() 提供实现。 传递给 Duplex() 构造函数的选项对象在内部转发给 Readable 和 Writable 构造函数。 这些选项与我们在前面几节中讨论过的选项相同,但添加了一个名为allowHalfOpen(默认为 true)的新选项,如果设置为 false,将导致流的两个部分(可读和可写)结束 如果只有其中一个这样做的话。

双向流是一种既可读又可写的流。当我们要描述一个既是数据源又是数据目的地的实体(例如网络套接字)时,它非常有用。双向流同时继承了 stream.Readable 和 stream.Writable 的方法。因此,这对我们来说并不新鲜。这意味着我们可以 read()或 write() 数据,或同时监听 readable 和 drain 事件。

要创建自定义 Duplex 流,我们必须为 _read()_write() 提供一个实现。传递给 Duplex() 构造函数的选项对象会在内部转发给 Readable 和 Writable 构造函数。这些选项与我们在前几节中讨论过的选项相同,但增加了一个名为 allowHalfOpen(默认为 true)的新选项,如果将其设置为 false,则如果只有一个选项打开,则流的两个部分(Readable 和 Writable)都将结束。

如果我们需要一个 Duplex 流一侧工作在对象模式,另一侧工作在二进制模式,我们可以独立使用 readableObjectModewritableObjectMode 选项。

转换流

transform 流是一种特殊的 Duplex 流,专门设计用于处理数据转换。 仅举几个具体示例,我们在本章开头讨论的函数 zlib.createGzip() 和 crypto.createCipheriv() 分别创建用于压缩和加密的 Transform 流。

在简单的双工流中,从流中读取的数据与写入其中的数据之间没有直接关系(至少流对这种关系是不可知的)。 考虑一下 TCP 套接字,它只向远程对等点发送和接收数据; 套接字不知道输入和输出之间的任何关系。 图 6.4 说明了双工流中的数据流:

image 2024 05 07 09 30 28 245
Figure 1. 图 6.4:双工流示意图

另一方面,转换流对从可写端接收的每个数据块应用某种转换,然后使转换后的数据在其可读端可用。 图 6.5 显示了数据在 Transform 流中的流动方式:

image 2024 05 07 09 31 07 055
Figure 2. 图 6.5:变换流示意图

从外部看,Transform 流的接口与 Duplex 流的接口完全相同。 然而,当我们想要构建一个新的 Duplex 流时,我们必须提供 _read()_write() 方法,而为了实现一个新的 Transform 流,我们必须填写另一对方法: _transform()_flush()

让我们通过示例了解如何创建新的 Transform 流。

实现转换流

让我们实现一个转换流来替换所有出现的给定字符串。 为此,我们必须创建一个名为replaceStream.js 的新模块。 让我们直接跳到实现:

import { Transform } from 'stream'

export class ReplaceStream extends Transform {
    constructor (searchStr, replaceStr, options) {
        super({ ...options })
        this.searchStr = searchStr
        this.replaceStr = replaceStr
        this.tail = ''
    }

    _transform (chunk, encoding, callback) {
        const pieces = (this.tail + chunk).split(this.searchStr) // (1)
        const lastPiece = pieces[pieces.length - 1] // (2)
        const tailLen = this.searchStr.length - 1
        this.tail = lastPiece.slice(-tailLen)
        pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
        this.push(pieces.join(this.replaceStr)) // (3)
        callback()
    }

    _flush (callback) {
        this.push(this.tail)
        callback()
    }
}

在此示例中,我们创建了一个扩展 Transform 基类的新类。 该类的构造函数接受三个参数:searchStr、replaceStr 和 options。 正如您可以想象的那样,它们允许我们定义要匹配的文本和要用作替换的字符串,以及用于底层 Transform 流的高级配置的选项对象。 我们还初始化了一个内部 tail 变量,稍后将由 _transform() 方法使用。

现在,我们来分析 _transform() 方法,它是我们新类的核心。 _transform() 方法实际上与可写流的 _write() 方法具有相同的签名,但它不是将数据写入底层资源,而是使用 this.push() 将其推送到内部读取缓冲区,就像我们所做的那样 在可读流的 _read() 方法中执行操作。 这显示了变换流的两侧是如何连接的。

ReplaceStream 的 _transform() 方法实现了我们算法的核心。 在缓冲区中搜索并替换字符串是一项简单的任务; 然而,当数据流式传输时,情况就完全不同了,并且可能的匹配可能分布在多个块中。 代码遵循的过程可以解释如下:

  1. 我们的算法使用 searchStr 作为分隔符来分割内存中的数据(尾部数据和当前块)。

  2. 然后,它获取该操作生成的数组的最后一项,并提取最后的 searchString.length - 1 个字符。 结果保存在 tail 变量中,并将添加到下一个数据块的前面。

  3. 最后,使用 replaceStr 作为分隔符将 split() 产生的所有片段连接在一起,并推入内部缓冲区。

当流结束时,tail 变量中可能仍有一些内容未推送到内部缓冲区中。 这正是 _flush() 方法的用途; 它是在流结束之前调用的,这是我们最后一次机会在完全结束流之前完成流或推送任何剩余数据。

_flush() 方法仅接受回调,我们必须确保在所有操作完成时调用该回调,从而导致流终止。 至此,我们就完成了 ReplaceStream 类。

现在,是时候尝试新的流了。 让我们创建一个脚本,将一些数据写入流中,然后读取转换后的结果:

import { ReplaceStream } from './replace-stream.js'

const replaceStream = new ReplaceStream('World', 'Node.js')
replaceStream.on('data', chunk => console.log(chunk.toString()))

replaceStream.write('Hello W')
replaceStream.write('orld!')
replaceStream.end()

为了让我们的流的生活变得更加困难,我们将搜索词(即 World)分布在两个不同的块上,然后,使用流动模式,我们从同一个流中读取,记录每个转换后的块。 运行上述程序应产生以下输出:

Hel
lo Node.js
!

请注意,前面的输出被分成多行,因为我们使用 console.log() 来打印它。 这使我们能够证明我们的实现能够正确替换字符串匹配,即使匹配文本跨越多个数据块也是如此。

简单构造函数

毫不奇怪,甚至 Transform 流也支持简化的构造。 此时,我们应该已经对这个 API 的外观有了一种直觉,所以让我们立即动手并使用这种方法重写前面的示例:

const searchStr = 'World'
const replaceStr = 'Node.js'
let tail = ''

const replaceStream = new Transform({
    defaultEncoding: 'utf8',

    transform (chunk, encoding, cb) {
        const pieces = (tail + chunk).split(searchStr)
        const lastPiece = pieces[pieces.length - 1]
        const tailLen = searchStr.length - 1
        tail = lastPiece.slice(-tailLen)
        pieces[pieces.length - 1] = lastPiece.slice(0, -tailLen)
        this.push(pieces.join(replaceStr))
        cb()
    },

    flush (cb) {
        this.push(tail)
        cb()
    }
})
// now write to replaceStream ...

正如预期的那样,简化的构造是通过直接实例化一个新的 Transform 对象并直接通过 options 对象将我们的特定转换逻辑传递给 Transform() 和 flush() 函数来实现的。 请注意,transform() 和 flush() 此处没有前置下划线。

使用转换流过滤和聚合数据

正如我们在上一节中提到的,转换流是实现数据转换管道的完美构建块。 在上一节中,我们演示了一个可以替换文本流中的单词的转换流的示例。 但转换流也可用于实现其他类型的数据转换。 例如,使用转换流来实现数据过滤和数据聚合是很常见的。

举个实际例子,假设一家财富 500 强公司要求我们分析一个包含 2020 年所有销售额的大文件。该公司希望我们使用 CSV 格式的销售报告 data.csv,计算在意大利的销售总利润。

为简单起见,我们假设存储在 CSV 文件中的销售数据每行包含三个字段:商品类型、销售国家/地区和利润。 因此,这样的文件可能如下所示:

type,country,profit
Household,Namibia,597290.92
Baby Food,Iceland,808579.10
Meat,Russia,277305.60
Meat,Italy,413270.00
Cereal,Malta,174965.25
Meat,Indonesia,145402.40
Household,Italy,728880.54
[... many more lines]

现在,很明显,我们必须找到所有以 “意大利” 作为国家/地区的记录,并在此过程中将匹配行的利润值汇总为一个数字。

为了以流式处理方式处理 CSV 文件,我们可以使用优秀的 csv-parse 模块 (nodejsdp.link/csv-parse)。

如果我们暂时假设我们已经实现了自定义流来过滤和聚合数据,那么此任务的可能解决方案可能如下所示:

import { createReadStream } from 'fs'
import parse from 'csv-parse'
import { FilterByCountry } from './filter-by-country.js'
import { SumProfit } from './sum-profit.js'

const csvParser = parse({ columns: true })

createReadStream('data.csv') // (1)
    .pipe(csvParser) // (2)
    .pipe(new FilterByCountry('Italy')) // (3)
    .pipe(new SumProfit()) // (4)
    .pipe(process.stdout) // (5)

这里提出的流管道包含五个步骤:

  1. 我们将源 CSV 文件作为流读取。

  2. 我们使用 csv-parse 库将文档的每一行解析为 CSV 记录。 对于每一行,该流将发出一个包含属性类型、国家/地区和利润的对象。

  3. 我们按国家/地区过滤所有记录,仅保留与国家/地区 “意大利” 匹配的记录。 所有不匹配 “Italy” 的记录都会被删除,这意味着它们不会被传递到管道中的其他步骤。 请注意,这是我们必须实现的自定义转换流之一。

  4. 对于每条记录,我们都会累积利润。 该流最终将发出一个字符串,它代表在意大利销售的产品的总利润值。 仅当原始文件中的所有数据均已完全处理时,流才会发出该值。 请注意,这是我们必须实现才能完成该项目的第二个自定义转换流。

  5. 最后,上一步发出的数据显示在标准输出中。

现在,让我们实现 FilterByCountry 流:

import { Transform } from 'stream'

export class FilterByCountry extends Transform {
    constructor (country, options = {}) {
        options.objectMode = true
        super(options)
        this.country = country
    }

    _transform (record, enc, cb) {
        if (record.country === this.country) {
            this.push(record)
        }
        cb()
    }
}

FilterByCountry 是自定义 Transform 流。 我们可以看到构造函数接受一个名为 “国家/地区” 的参数,它允许我们指定要过滤的国家/地区名称。 在构造函数中,我们还将流设置为以 objectMode 运行,因为我们知道它将用于处理对象(CSV 文件中的记录)。

_transform 方法中,我们检查当前记录的国家/地区是否与构造时指定的国家/地区匹配。 如果匹配,那么我们通过调用 this.push() 将记录传递到管道的下一阶段。 无论记录是否匹配,我们都需要调用 cb() 来指示当前记录已成功处理,并且流已准备好接收另一条记录。

模式:转换过滤器

有条件地调用 this.push(),只允许部分数据到达管道的下一阶段。

最后,让我们实现 SumProfit 过滤器:

import { Transform } from 'stream'

export class SumProfit extends Transform {
    constructor (options = {}) {
        options.objectMode = true
        super(options)
        this.total = 0
    }

    _transform (record, enc, cb) {
        this.total += Number.parseFloat(record.profit)
        cb()
    }

    _flush (cb) {
        this.push(this.total.toString())
        cb()
    }
}

该流也需要在 objectMode 下运行,因为它将接收表示来自 CSV 文件的记录的对象。 请注意,在构造函数中,我们还初始化了一个名为 Total 的实例变量,并将其值设置为 0。

_transform() 方法中,我们处理每条记录并使用当前利润值来增加总计。 值得注意的是,这一次我们没有调用 this.push()。 这意味着数据流经流时不会发出任何值。 不过,我们仍然需要调用 cb() 来指示当前记录已被处理并且流已准备好接收另一条记录。

为了在处理完所有数据后发出最终结果,我们必须使用 _flush() 方法定义自定义刷新行为。 在这里,我们最终调用 this.push() 来发出结果总值的字符串表示形式。 请记住,_flush() 在流关闭之前会自动调用。

模式:流式聚合

使用 _transform() 处理数据并累积部分结果,然后仅在 _flush() 方法中调用 this.push() 以在处理完所有数据后发出结果。

这样就完成了我们的示例。 现在,您可以从代码存储库中获取 CSV 文件并执行此程序以查看意大利的总利润是多少。 毫不奇怪,这将是一大笔钱,因为我们谈论的是财富 500 强公司的利润!

直通(PassThrough)流

还有第五种类型的流值得一提:PassThrough。 这种类型的流是一种特殊类型的变换,它输出每个数据块而不应用任何变换。

PassThrough 可能是最被低估的流类型,但实际上在某些情况下它可以成为我们工具带中非常有价值的工具。 例如,PassThrough 流对于可观察性或实现延迟管道和惰性流模式很有用。

可观察性

如果我们想要观察有多少数据流经一个或多个流,我们可以通过将数据事件侦听器附加到 PassThrough 实例,然后将该实例传输到流管道的给定点来实现。 让我们看一个简化的例子来理解这个概念:

import { PassThrough } from 'stream'

let bytesWritten = 0
const monitor = new PassThrough()
monitor.on('data', (chunk) => {
    bytesWritten += chunk.length
})
monitor.on('finish', () => {
    console.log(`${bytesWritten} bytes written`)
})

monitor.write('Hello!')
monitor.end()

在此示例中,我们创建 PassThrough 的新实例并使用 data 事件来计算流经流的字节数。 我们还使用 finish 事件将总量转储到控制台。 最后,我们使用 write() 和 end() 将一些数据直接写入流中。 这只是一个说明性的例子; 在更现实的场景中,我们将在流管道的给定点中通过管道传输监视器实例。 例如,如果我们想在本章的第一个文件压缩示例中监视有多少字节写入磁盘,我们可以通过执行以下操作轻松实现:

createReadStream(filename)
    .pipe(createGzip())
    .pipe(monitor)
    .pipe(createWriteStream(`${filename}.gz`))

这种方法的优点在于,我们无需触动流水线中的任何其他现有流,因此,如果我们需要观察流水线的其他部分(例如,假设我们想知道未压缩数据的字节数),我们只需花很少的力气就可以移动监视器。

请注意,您可以使用自定义转换流来实现 monitor 流的另一个版本。在这种情况下,您必须确保接收到的数据块在不做任何修改或延迟的情况下被推送,而 PassThrough 流会自动帮您做到这一点。这两种方法同样有效,因此请选择您感觉更自然的方法。

后期管道

在某些情况下,我们可能必须使用接受流作为输入参数的 API。 这通常不是什么大问题,因为我们已经知道如何创建和使用流。 然而,如果我们想要通过流读取或写入的数据在调用给定的 API 之后可用,那么情况可能会变得更加复杂。

为了更具体地形象化这个场景,我们假设我们必须使用一个 API,它为我们提供以下函数来将文件上传到数据存储服务:

function upload (filename, contentStream) {
    // ...
}

此功能实际上是 Amazon Simple Storage Service (S3) 或 Azure Blob Storage 服务等文件存储服务 SDK 中常用功能的简化版本。 通常,这些库将为用户提供更灵活的功能,可以接收不同格式的内容数据(例如,字符串、缓冲区或可读流)。

现在,如果我们想从文件系统上传文件,这是一个简单的操作,我们可以这样做:

import { createReadStream } from 'fs'
upload('a-picture.jpg', createReadStream('/path/to/a-picture.jpg'))

但是如果我们想在上传之前对文件流进行一些处理怎么办? 例如,假设我们要压缩或加密数据? 另外,如果我们必须在调用上传函数后异步执行此转换怎么办?

在这种情况下,我们可以向 upload() 函数提供 PassThrough 流,它将有效地充当占位符。 upload() 的内部实现将立即尝试使用其中的数据,但在我们实际写入之前,流中不会有可用的数据。 此外,在我们关闭流之前,该流不会被视为完成,因此 upload() 函数必须等待数据流过 PassThrough 实例才能启动上传。

让我们看看一个可能的命令行脚本,它使用这种方法从文件系统上传文件,并使用 Brotli 压缩来压缩它。 我们假设第三方 upload() 函数在名为 upload.js 的文件中提供:

import { createReadStream } from 'fs'
import { createBrotliCompress } from 'zlib'
import { PassThrough } from 'stream'
import { basename } from 'path'
import { upload } from './upload.js'

const filepath = process.argv[2] // (1)
const filename = basename(filepath)
const contentStream = new PassThrough() // (2)

upload(`${filename}.br`, contentStream) // (3)
    .then((response) => {
        console.log(`Server response: ${response.data}`)
    })
    .catch((err) => {
        console.error(err)
        process.exit(1)
    })

createReadStream(filepath) // (4)
    .pipe(createBrotliCompress())
    .pipe(contentStream)

在本书的存储库中,您将找到此示例的完整实现,它允许您将文件上传到可以在本地运行的 HTTP 服务器。

让我们回顾一下前面的示例中发生的情况:

  1. 我们从第一个命令行参数获取要上传的文件的路径,并使用 basename 从给定路径推断出文件名。

  2. 我们为内容流创建一个占位符作为 PassThrough 实例。

  3. 现在,我们通过传递文件名(添加了 .br 后缀,表明它使用 Brotli 压缩)和占位符内容流来调用上传函数。

  4. 最后,我们通过链接文件系统可读流、Brotli 压缩转换流以及最后作为目标的内容流来创建管道。

执行此代码时,一旦我们调用 upload() 函数(可能建立到远程服务器的连接),上传就会开始,但数据只会在管道初始化后才开始流动。 请注意,当处理完成时,我们的管道还将关闭 contentStream,这将向 upload() 函数指示所有内容已完全使用。

模式

当您需要为将来读取或写入的数据提供占位符时,请使用 PassThrough 流。

我们还可以使用这种模式来改造 upload() 函数的接口。 我们可以让它返回一个 Writeable 流,而不是接受 Readable 流作为输入,然后可以使用它来提供我们想要上传的数据:

function createUploadStream (filename) {
    // ...
    // returns a writable stream that can be used to upload data
}

如果我们的任务是实现这个函数,我们可以通过使用 PassThrough 实例以一种非常优雅的方式实现这一点,如下面的示例实现所示:

function createUploadStream (filename) {
    const connector = new PassThrough()
    upload(filename, connector)
    return connector
}

在前面的代码中,我们使用 PassThrough 流作为连接器。 对于库的使用者可以在任何后续阶段写入数据的情况,该流成为完美的抽象。

然后可以按如下方式使用 createUploadStream() 函数:

const upload = createUploadStream('a-file.txt')
upload.write('Hello World')
upload.end()

本书的存储库还包含一个采用这种替代模式的 HTTP 上传示例。

延迟流

有时,我们需要同时创建大量流,例如将它们传递给函数进行进一步处理。 一个典型的例子是使用 archiver (nodejsdp.link/archiver),这是一个用于创建 TAR 和 ZIP 等档案的包。 归档程序包允许您从一组流创建归档,代表要添加的文件。 问题是,如果我们想要传递大量流,例如来自文件系统上的文件,我们可能会收到 EMFILE,打开文件过多错误。 这是因为每次创建新流时,fs 模块中的 createReadStream() 等函数实际上都会打开一个文件描述符,甚至在您开始从这些流中读取数据之前也是如此。

用更通用的术语来说,创建流实例可能会立即初始化昂贵的操作(例如,打开文件或套接字、初始化与数据库的连接等),甚至在我们实际开始使用此类流之前也是如此。 如果您要创建大量流实例以供以后使用,这可能并不理想。

在这些情况下,您可能希望延迟昂贵的初始化,直到实际需要使用流中的数据为止。

可以通过使用像 lazystream(nodejsdp.link/lazystream)这样的库来实现这一点。 该库允许您有效地为实际流实例创建代理,其中直到某些代码实际开始使用来自代理的数据时才会创建代理实例。

在下面的示例中,lazystream 允许我们为特殊的 Unix 文件 /dev/urandom 创建一个惰性可读流:

import lazystream from 'lazystream'
const lazyURandom = new lazystream.Readable(function (options) {
    return fs.createReadStream('/dev/urandom')
})

我们作为参数传递给 new lazystream.Readable() 的函数实际上是一个工厂函数,它在必要时生成代理流。

在幕后,lazystream 是使用 PassThrough 流实现的,仅当第一次调用其 _read() 方法时,才会通过调用工厂函数创建代理实例,并将生成的流通过管道传输到 PassThrough 本身。 使用流的代码完全不知道此处发生的代理,并且它将使用数据,就好像数据直接来自 PassThrough 流一样。 lazystream 也实现了类似的实用程序来创建惰性可写流。

从头开始创建惰性可读和可写流可能是留给您的一项有趣的练习。 如果您遇到困难,请查看 lazystream 的源代码,以获取有关如何实现此模式的灵感。

在下一节中,我们将更详细地讨论 .pipe() 方法,并了解连接不同流以形成处理管道的其他方法。

使用管道连接流

Unix 管道的概念是由 Douglas Mcllroy 发明的。 这使得一个程序的输出能够连接到下一个程序的输入。 看看下面的命令:

echo Hello World! | sed s/World/Node.js/g

在前面的命令中,echo 将写入 Hello World! 到其标准输出,然后重定向到 sed 命令的标准输入(感谢管道 | 运算符)。 然后,sed 将任何出现的 World 替换为 Node.js,并将结果打印到其标准输出(这次是控制台)。

类似地,Node.js 流可以使用 Readable 流的 pipeline() 方法来连接,该方法具有以下接口:

readable.pipe(writable, [options])

非常直观地,pipe() 方法获取从可读流发出的数据并将其泵送到提供的可写流中。 此外,当可读流发出结束事件时,可写流会自动结束(除非我们指定 {end: false} 作为选项)。 pipeline() 方法返回第一个参数中传递的可写流,如果这样的流也是可读的(例如 Duplex 或 Transform 流),则允许我们创建链式调用。

将两个流连接在一起会产生吸力,这使得数据自动流向可写流,因此不需要调用 read() 或 write(),但最重要的是,不再需要控制背压,因为 它会自动得到处理。

为了提供一个简单的示例,我们可以创建一个新模块,该模块从标准输入获取文本流,应用前面构建自定义 ReplaceStream 时讨论的替换转换,然后将数据推送回标准输出:

// replace.js
import { ReplaceStream } from './replace-stream.js'

process.stdin
    .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
    .pipe(process.stdout)

前面的程序将来自标准输入的数据传输到 ReplaceStream 的实例中,然后返回到标准输出。 现在,为了尝试这个小应用程序,我们可以利用 Unix 管道将一些数据重定向到其标准输入,如以下示例所示:

echo Hello World! | node replace.js World Node.js

这应该产生以下输出:

Hello Node.js!

这个简单的示例演示了流(特别是文本流)是一个通用接口,而管道是几乎神奇地组合和互连所有这些接口的方式。

管道和错误处理

使用 pipeline() 时,错误事件不会通过管道自动传播。 以以下代码片段为例:

stream1
    .pipe(stream2)
    .on('error', () => {})

在前面的管道中,我们将仅捕获来自 Stream2 的错误,这是我们附加侦听器的流。 这意味着,如果我们想要捕获从 stream1 生成的任何错误,我们必须直接附加另一个错误侦听器,这将使我们的示例如下所示:

stream1
    .on('error', () => {})
    .pipe(stream2)
    .on('error', () => {})

这显然并不理想,尤其是在处理具有大量步骤的管道时。 更糟糕的是,如果发生错误,失败的流只会从管道中取消传输。 失败的流未被正确销毁,这可能会留下悬空资源(例如文件描述符、连接等)并泄漏内存。 前面的代码片段的更健壮(但不优雅)的实现可能如下所示:

function handleError (err) {
    console.error(err)
    stream1.destroy()
    stream2.destroy()
}
stream1
    .on('error', handleError)
    .pipe(stream2)
    .on('error', handleError)

在此示例中,我们为 stream1 和 stream2 的错误事件注册了一个处理程序。 当错误发生时,我们的 handleError() 函数被调用,我们可以记录错误并销毁管道中的每个流。 这使我们能够确保所有分配的资源都得到正确释放,并且错误得到妥善处理。

使用 pipeline() 更好地处理错误

在管道中手动处理错误不仅麻烦,而且容易出错——如果可以的话,我们绝对应该避免这种情况!

幸运的是,核心流包为我们提供了一个出色的实用函数,可以使构建管道成为一种更安全、更愉快的实践,这就是 pipeline() 辅助函数。

简而言之,您可以按如下方式使用 pipeline() 函数:

pipeline(stream1, stream2, stream3, ... , cb)

该助手将参数列表中传递的每个流传输到下一个流。 对于每个流,它还将注册一个适当的错误并关闭侦听器。 这样,当管道成功完成或因错误而中断时,所有流都会被正确销毁。 最后一个参数是一个可选的回调,将在流完成时调用。 如果由于错误而完成,则将以给定错误作为第一个参数来调用回调。

为了使用这个助手进行一些练习,让我们编写一个简单的命令行脚本来实现以下管道:

  • 从标准输入读取 Gzip 数据流

  • 解压缩数据

  • 将所有文本变为大写

  • 对结果数据进行 Gzip 压缩

  • 将数据发送回标准输出

我们将此模块称为 uppercasify-gzipped.js:

import { createGzip, createGunzip } from 'zlib' // (1)
import { Transform, pipeline } from 'stream'

const uppercasify = new Transform({ // (2)
    transform (chunk, enc, cb) {
        this.push(chunk.toString().toUpperCase())
        cb()
    }
})

pipeline( // (3)
    process.stdin,
    createGunzip(),
    uppercasify,
    createGzip(),
    process.stdout,
    (err) => { // (4)
        if (err) {
            console.error(err)
            process.exit(1)
        }
    }
)

在此示例中:

  1. 我们从 zlib 和流模块导入必要的依赖项。

  2. 我们创建一个简单的 Transform 流,使每个块都大写。

  3. 我们定义管道,在其中按顺序列出所有流实例。

  4. 我们添加一个回调来监视流的完成情况。 如果发生错误,我们会在标准错误接口中打印错误,并以错误代码 1 退出。

管道将通过消耗标准输入中的数据并为标准输出生成数据来自动启动。

我们可以使用以下命令测试我们的脚本:

echo 'Hello World!' | gzip | node uppercasify-gzipped.js | gunzip

这应该产生以下输出:

HELLO WORLD!

如果我们尝试从前面的命令序列中删除 gzip 步骤,我们的脚本将失败并出现类似于以下内容的错误:

Error: unexpected end of file
at Zlib.zlibOnError [as onerror] (zlib.js:180:17) {
errno: -5,
code: 'Z_BUF_ERROR'
}

此错误是由使用 createGunzip() 函数创建的流引发的,该函数负责解压缩数据。 如果数据实际上没有经过 gzip 压缩,则解压缩算法将无法处理数据并且会失败。 在这种情况下, pipeline() 将在错误发生后负责清理工作并销毁管道中的所有流。

通过使用 core util 模块中的 promisify() 帮助器,可以轻松地 promisify pipeline() 函数。

现在我们已经对 Node.js 流有了深入的了解,我们准备好进入一些更复杂的流模式,例如控制流和高级管道模式。