发现流的重要性

在 Node.js 等基于事件的平台中,处理 I/O 的最有效方法是实时处理,即在输入可用时立即使用输入,并在应用程序生成输出后立即发送输出。

在本节中,我们将初步介绍 Node.js 流及其优势。 请记住,这只是一个概述,本章稍后将详细分析如何使用和组合流。

缓冲与流式传输

到目前为止,我们在本书中看到的几乎所有异步 API 都使用缓冲模式工作。 对于输入操作,缓冲模式会导致来自资源的所有数据被收集到缓冲区中,直到操作完成; 然后它作为一个数据块传递回调用者。 下图显示了此范例的直观示例:

image 2024 05 07 08 12 08 167
Figure 1. 图 6.1:缓冲

在图6.1中,我们可以看到,在时间 t1,从资源接收到一些数据并将其保存到缓冲区中。 在时间 t2 处,接收到另一个数据块(最后一个数据块),该数据块完成了读取操作,因此在 t3 处,整个缓冲区被发送给消费者。

另一方面,流允许我们在数据从资源到达后立即对其进行处理。 如下图所示:

image 2024 05 07 08 12 50 639
Figure 2. 图 6.2:流媒体

这次,图 6.2 向您展示了,一旦从资源接收到每个新数据块,它就会立即传递给消费者,消费者现在有机会立即处理它,而无需等待所有数据都在缓冲区中收集完毕。

但这两种方法有什么区别呢? 纯粹从效率的角度来看,流在空间(内存使用)和时间(计算时钟时间)方面都可以更加高效。 然而,Node.js 流还有另一个重要优势:可组合性。 现在让我们看看这些属性对我们设计和编写应用程序的方式有什么影响。

空间效率

首先,流允许我们做一些缓冲数据和一次性处理无法做到的事情。例如,考虑一下我们必须读取一个非常大的文件的情况,比方说,几百兆字节甚至几千兆字节的文件。显然,使用在完全读取文件后返回一个大缓冲区的 API 并不是一个好主意。试想一下,如果同时读取几个这样的大文件,我们的应用程序很容易就会耗尽内存。此外,V8 中的缓冲区大小有限。你不能分配超过几千兆字节的数据,因此我们可能会在物理内存耗尽之前就碰壁。

缓冲区的实际最大大小随平台和 Node.js 版本的不同而变化。 如果您想知道给定平台中的字节限制是多少,您可以运行以下代码:

import buffer from 'buffer'
console.log(buffer.constansts.MAX_LENGTH)

使用缓冲 API 进行 Gzip 压缩

为了举一个具体的例子,让我们考虑一个使用 GZIP 格式压缩文件的简单命令行应用程序。 使用缓冲 API,此类应用程序在 Node.js 中将如下所示(为简洁起见,省略了错误处理):

import { promises as fs } from 'fs'
import { gzip } from 'zlib'
import { promisify } from 'util'
const gzipPromise = promisify(gzip)

const filename = process.argv[2]

async function main () {
    const data = await fs.readFile(filename)
    const gzippedData = await gzipPromise(data)
    await fs.writeFile(`${filename}.gz`, gzippedData)
    console.log('File successfully compressed')
}

main()

现在,我们可以尝试将上述代码放入名为 gzip-buffer.js 的文件中,然后使用以下命令运行它:

node gzip-buffer.js <path to file>

如果我们选择一个足够大的文件(例如,大约 8 GB),我们很可能会收到一条错误消息,指出我们尝试读取的文件大于允许的最大缓冲区大小:

RangeError [ERR_FS_FILE_TOO_LARGE]: File size (8130792448) is greater
than possible Buffer: 2147483647 bytes

这正是我们所期望的,这表明我们使用了错误的方法。

使用流进行 Gzip 压缩

修复 Gzip 应用程序并使其能够处理大文件的最简单方法是使用流 API。 让我们看看如何实现这一目标。 让我们用以下代码编写一个新模块:

// gzip-stream.js
import { createReadStream, createWriteStream } from 'fs'
import { createGzip } from 'zlib'

const filename = process.argv[2]

createReadStream(filename)
    .pipe(createGzip())
    .pipe(createWriteStream(`${filename}.gz`))
    .on('finish', () => console.log('File successfully compressed'))

“是这样吗?” 你可能会问。 是的! 正如我们所说,流因其接口和可组合性而令人惊叹,从而允许干净、优雅和简洁的代码。 我们稍后会更详细地看到这一点,但现在,需要认识到的重要一点是,该程序将针对任何大小的文件以及恒定的内存利用率顺利运行。 自己尝试一下(但考虑到压缩大文件可能需要一段时间)。

请注意,在前面的示例中,为了简洁起见,我们省略了错误处理。 我们将在本章后面讨论流的正确错误处理的细微差别。 在此之前,请注意大多数示例都缺乏适当的错误处理。

时间效率

现在让我们考虑一个应用程序的情况,该应用程序压缩文件并将其上传到远程 HTTP 服务器,而远程 HTTP 服务器又将其解压缩并将其保存在文件系统上。 如果我们应用程序的客户端组件是使用缓冲 API 实现的,则只有在读取并压缩整个文件后才会开始上传。 另一方面,只有当所有数据都收到后,服务器才会开始解压缩。 实现相同结果的更好解决方案涉及使用流。 在客户端计算机上,流允许我们在从文件系统读取数据块后立即压缩和发送数据块,而在服务器上,它们允许我们在从远程对等点接收到每个数据块后立即对其进行解压缩。 让我们通过从服务器端开始构建前面提到的应用程序来演示这一点。

让我们创建一个名为 gzip-receive.js 的模块,其中包含以下代码:

import { createServer } from 'http'
import { createWriteStream } from 'fs'
import { createGunzip } from 'zlib'
import { basename, join } from 'path'

const server = createServer((req, res) => {
    const filename = basename(req.headers['x-filename'])
    const destFilename = join('received_files', filename)
    console.log(`File request received: ${filename}`)
    req
        .pipe(createGunzip())
        .pipe(createWriteStream(destFilename))
        .on('finish', () => {
            res.writeHead(201, { 'Content-Type': 'text/plain' })
            res.end('OK\n')
            console.log(`File saved: ${destFilename}`)
        })
})

server.listen(3000, () => console.log('Listening on http://localhost:3000'))

在前面的示例中,req 是一个流对象,服务器使用它从网络接收分块的请求数据。 感谢 Node.js 流,每块数据在收到后都会被解压缩并保存到磁盘。

您可能已经注意到,在我们的服务器应用程序中,我们使用 basename() 从接收到的文件的名称中删除任何可能的路径。 这是一种安全最佳实践,因为我们希望确保接收到的文件准确保存在我们的 receive_files 文件夹中。 如果没有 basename(),恶意用户就可以制作一个请求,该请求可以有效地覆盖系统文件并将恶意代码注入服务器计算机。 例如,想象一下,如果文件名设置为 /usr/bin/node 会发生什么? 在这种情况下,攻击者可以有效地用任何任意文件替换我们的 Node.js 解释器。

我们应用程序的客户端将进入一个名为 gzip-send.js 的模块,如下所示:

import { request } from 'http'
import { createGzip } from 'zlib'
import { createReadStream } from 'fs'
import { basename } from 'path'

const filename = process.argv[2]
const serverHost = process.argv[3]

const httpRequestOptions = {
    hostname: serverHost,
    port: 3000,
    path: '/',
    method: 'PUT',
    headers: {
        'Content-Type': 'application/octet-stream',
        'Content-Encoding': 'gzip',
        'X-Filename': basename(filename)
    }
}

const req = request(httpRequestOptions, (res) => {
    console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
    .pipe(createGzip())
    .pipe(req)
    .on('finish', () => {
        console.log('File successfully sent')
    })

在前面的代码中,我们再次使用流从文件中读取数据,然后在从文件系统读取数据后立即压缩并发送每个块。

现在,为了尝试该应用程序,我们首先使用以下命令启动服务器:

node gzip-receive.js

然后,我们可以通过指定要发送的文件和服务器地址(例如 localhost)来启动客户端:

node gzip-send.js <path to file> localhost

如果我们选择足够大的文件,我们就可以了解数据如何从客户端流向服务器。 但究竟为什么这种范例(我们拥有流动数据)比使用缓冲 API 更有效? 图 6.3 应该使这个概念更容易理解:

image 2024 05 07 08 24 14 587
Figure 3. 图 6.3:缓冲和流式传输比较

处理文件时,它会经历多个连续的阶段:

  1. 【客户端】从文件系统读取

  2. 【客户端】压缩数据

  3. 【客户端】发送到服务器

  4. 【服务器】从客户端接收

  5. 【服务器】解压数据

  6. 【服务器】将数据写入磁盘

为了完成加工,我们必须像流水线一样,依次经过各个阶段,直到最后。 在图 6.3 中,我们可以看到,使用缓冲 API,该过程是完全顺序的。 要压缩数据,我们首先必须等待整个文件被读取,然后,要发送数据,我们必须等待整个文件被读取和压缩,等等。

使用流,一旦我们收到第一个数据块,装配线就会启动,而无需等待整个文件被读取。 但更神奇的是,当下一个数据块可用时,无需等待上一组任务完成; 相反,另一条装配线同时启动。 这非常有效,因为我们执行的每个任务都是异步的,因此可以通过 Node.js 并行化。 唯一的限制是必须保留块到达每个阶段的顺序。 Node.js 流的内部实现负责为我们维护顺序。

从图6.3中我们可以看到,使用流的结果是整个过程花费的时间更少,因为我们没有浪费时间等待所有数据一次被读取和处理。

可组合性

通过 pipeline() 方法,我们可以将不同的处理单元连接起来,每个处理单元负责一个功能,这就是完美的 Node.js 风格。之所以能做到这一点,是因为流具有统一的接口,而且它们在 API 方面可以相互理解。唯一的先决条件是,流水线中的下一个流必须支持前一个流产生的数据类型,可以是二进制、文本,甚至是对象,我们将在本章后面看到。

要看看此属性的强大功能的另一个演示,我们可以尝试向我们之前构建的 gzip-send/gzip-receive 应用程序添加一个加密层。

为此,我们需要对客户端和服务器进行一些小的更改。

添加客户端加密

我们先从客户端开始:

// ...
import { createCipheriv, randomBytes } from 'crypto' // (1)
const filename = process.argv[2]
const serverHost = process.argv[3]
const secret = Buffer.from(process.argv[4], 'hex') // (2)
const iv = randomBytes(16) // (3)
// ...

让我们回顾一下这里所做的更改:

  1. 首先,我们从 crypto 模块导入 createCipheriv() Transform 流和 randomBytes() 函数。

  2. 我们从命令行获取服务器的加密密钥。 我们希望该字符串作为十六进制字符串传递,因此我们读取该值并使用设置为十六进制模式的缓冲区将其加载到内存中。

  3. 最后,我们生成一个随机字节序列,将其用作文件加密的初始化向量。

现在,我们可以更新负责创建 HTTP 请求的代码段:

const httpRequestOptions = {
    hostname: serverHost,
    headers: {
        'Content-Type': 'application/octet-stream',
        'Content-Encoding': 'gzip',
        'X-Filename': basename(filename),
        'X-Initialization-Vector': iv.toString('hex') // (1)
    }
}

// ...

const req = request(httpRequestOptions, (res) => {
    console.log(`Server response: ${res.statusCode}`)
})

createReadStream(filename)
    .pipe(createGzip())
    .pipe(createCipheriv('aes192', secret, iv)) // (2)
    .pipe(req)
// ...

这里的主要变化是:

  1. 我们将初始化向量作为 HTTP 标头传递到服务器。

  2. 我们在 Gzip 阶段之后对数据进行加密。

这就是客户端的全部内容。

添加服务器端解密

现在让我们重构服务器。 我们需要做的第一件事是从核心加密模块导入一些实用函数,我们可以用它们来生成随机加密密钥(秘密):

// ...
import { createDecipheriv, randomBytes } from 'crypto'
const secret = randomBytes(24)
console.log(`Generated secret: ${secret.toString('hex')}`)

生成的秘密以十六进制字符串的形式打印到控制台,以便我们可以与客户共享。

现在,我们需要更新文件接收逻辑:

const server = createServer((req, res) => {
    const filename = basename(req.headers['x-filename'])
    const iv = Buffer.from(
        req.headers['x-initialization-vector'], 'hex') // (1)
    const destFilename = join('received_files', filename)
    console.log(`File request received: ${filename}`)
    req
        .pipe(createDecipheriv('aes192', secret, iv)) // (2)
        .pipe(createGunzip())
        .pipe(createWriteStream(destFilename))
        // ...

在这里,我们应用了两项更改:

  1. 我们必须读取客户端发送的加密初始化向量(nodejsdp.link/iv)。

  2. 我们的流管道的第一步现在负责使用加密模块中的 createDecipheriv Transform 流来解密传入的数据。

只需很少的努力(只需几行代码),我们就为我们的应用程序添加了一个加密层; 我们只需使用一些已经可用的转换流(createCipherivcreateDecipheriv)并将它们包含在客户端和服务器的流处理管道中。 以类似的方式,我们可以添加和组合其他流,就像我们在玩乐高积木一样。

这种方法的主要优点是可重用性,但正如我们从到目前为止的代码中看到的,流还使代码变得更干净、更模块化。 由于这些原因,流通常不仅用于处理纯 I/O,还用作简化和模块化代码的手段。

现在我们已经介绍了流,我们准备以更结构化的方式探索 Node.js 中可用的不同类型的流。