回调模式

回调是 Reactor 模式(上一章已介绍)处理程序的具体化。回调是赋予 Node.js 独特编程风格的印记之一。

回调是调用来传播操作结果的函数,而这正是我们处理异步操作时所需的。在异步世界中,回调取代了返回指令的使用,而返回指令总是同步执行的。JavaScript 是实现回调的理想语言,因为函数是一等对象,可以轻松赋值给变量、作为参数传递、从另一个函数调用中返回或存储在数据结构中。实现回调的另一种理想结构是闭包。通过闭包,我们可以引用创建函数时的环境;这样,无论在何时何地调用回调,我们都可以始终保持请求异步操作时的上下文。

如果您需要刷新有关闭包的知识,可以参考 MDN Web Docs 上的文章: nodejsdp.link/mdn-closures。

在本节中,我们将分析这种特殊的编程风格,它使用回调而不是返回指令。

连续传递风格

在 JavaScript 中,回调是一个函数,它作为参数传递给另一个函数,并在操作完成后调用其结果。在函数式编程中,这种传播结果的方式被称为延续传递方式(continuation-passing style,CPS)。

这是一个通用概念,并不总是与异步操作相关。 事实上,它只是表明通过将结果传递给另一个函数(回调)来传播结果,而不是直接将其返回给调用者。

同步CPS

为了阐明这个概念,我们来看一个简单的同步函数:

function add (a, b) {
    return a + b
}

如果您想知道,这里没有什么特别的事情发生。 使用返回指令将结果传回调用者。 这也称为直接样式,它代表同步编程中返回结果的最常见方式。

上述函数的等效 CPS 如下:

function addCps (a, b, callback) {
    callback(a + b)
}

addCps() 函数是同步 CPS 函数。 它是同步的,因为只有当回调也完成执行时它才会完成执行。 下面的代码演示了这个语句:

console.log('before')
addCps(1, 2, result => console.log(`Result: ${result}`))
console.log('after')

由于 addCps() 是同步的,前面的代码将简单地打印以下内容:

before
Result: 3
after

现在,让我们看看异步 CPS 是如何工作的。

异步CPS

让我们考虑 addCps() 函数是异步的情况:

function additionAsync (a, b, callback) {
    setTimeout(() => callback(a + b), 100)
}

在前面的代码中,我们使用 setTimeout() 来模拟回调的异步调用。 setTimeout() 将一个任务添加到事件队列中,并在给定的毫秒数后执行。 这显然是一个异步操作。 现在,让我们尝试使用 additionAsync() 并看看操作顺序如何变化:

console.log('before')
additionAsync(1, 2, result => console.log(`Result: ${result}`))
console.log('after')

前面的代码将打印以下内容:

before
after
Result: 3

由于 setTimeout() 触发异步操作,因此它不会等待回调执行; 相反,它立即返回,将控制权交还给 additionAsync(),然后再次交还给其调用者。 Node.js 中的这个属性至关重要,因为一旦发送异步请求,它就会将控制权交还给事件循环,从而允许处理队列中的新事件。

图 3.1 显示了其工作原理:

image 2024 05 06 11 09 48 200
Figure 1. 图 3.1:异步函数调用的控制流程

当异步操作完成时,执行会从导致暂停的异步函数提供的回调函数重新开始。这个重新开始的执行是从事件循环开始的,因此它拥有全新的栈空间。这正是 JavaScript 非常方便的地方。得益于闭包,即使回调函数在不同时间点和不同位置被调用,维护异步函数调用者的上下文也变得轻而易举。

总结一下,同步函数会阻塞,直到其操作完成。异步函数会立即返回,其结果会在事件循环的后续周期内传递给一个处理程序(在本例中,是回调函数)。

非 CPS 回调

在某些情况下,回调参数的存在可能会让我们认为函数是异步的或正在使用 CPS。 这并不总是正确的。 我们以 Array 对象的 map() 方法为例:

const result = [1, 5, 7].map(element => element - 1)
console.log(result) // [0, 4, 6]

显然,回调仅用于迭代数组的元素,而不是传递操作结果。 事实上,结果是使用直接方式同步返回的。 非 CPS 回调和 CPS 回调之间没有语法差异。 因此,回调的意图应该在 API 文档中明确说明。

在下一节中,我们将讨论每个 Node.js 开发人员都应该注意的回调最重要的陷阱之一。

同步还是异步?

正如我们所见,函数的性质(同步或异步)会彻底改变指令的执行顺序。这会对整个应用程序的流程产生重大影响,无论是正确性还是效率方面。下面我们将分析这两种范式及其缺陷。总而言之,我们应该避免让 API 的性质(同步或异步)前后不一致,因为这样会导致一系列问题,这些问题可能非常难以检测和重现。为了进行分析,我们将以一个异步行为不一致的函数为例。

不可预测的函数

最危险的情况之一是 API 在某些条件下同步运行,而在其他条件下异步运行。 我们以下面的代码为例:

import { readFile } from 'fs'

const cache = new Map()

function inconsistentRead (filename, cb) {
    if (cache.has(filename)) {
        // invoked synchronously
        cb(cache.get(filename))
    } else {
        // asynchronous function
        readFile(filename, 'utf8', (err, data) => {
            cache.set(filename, data)
            cb(data)
        })
    }
}

该函数使用缓存映射来存储不同文件读取操作的结果。请注意,这只是一个示例;它没有错误管理,缓存逻辑本身也不是最优的(在第 11 章 “高级配方” 中,您将学习如何正确处理异步缓存)。但除此之外,前面的函数是危险的,因为它在第一次读取文件并设置缓存时表现为异步,但是一旦文件内容已经在缓存中,它对所有后续请求都变为同步。

Unleashing Zalgo

现在,让我们讨论一下使用不可预测的函数(例如我们刚刚定义的函数)如何轻松破坏应用程序。 考虑以下代码:

function createFileReader (filename) {
    const listeners = []
    inconsistentRead(filename, value => {
        listeners.forEach(listener => listener(value))
    })

    return {
        onDataReady: listener => listeners.push(listener)
    }
}

当调用前面的函数时,它会创建一个充当通知程序的新对象,允许我们为文件读取操作设置多个侦听器。 当读取操作完成且数据可用时,将立即调用所有侦听器。 前面的函数使用我们的 inconfirmedRead() 函数来实现此功能。 让我们看看如何使用 createFileReader() 函数:

const reader1 = createFileReader('data.txt')
reader1.onDataReady(data => {
    console.log(`First call data: ${data}`)

    // ...sometime later we try to read again from
    // the same file
    const reader2 = createFileReader('data.txt')
    reader2.onDataReady(data => {
        console.log(`Second call data: ${data}`)
    })
})

正如您所看到的,第二个读取器的回调永远不会被调用。 让我们看看为什么:

  • 在创建 reader1 期间,我们的 inconfirmedRead() 函数会异步运行,因为没有可用的缓存结果。 这意味着任何 onDataReady 侦听器都将在事件循环的另一个周期中稍后被调用,因此我们有足够的时间来注册侦听器。

  • 然后,在事件循环的循环中创建 reader2,其中所请求文件的缓存已存在。 在这种情况下,对不一致 Read() 的内部调用将是同步的。 因此,它的回调将立即被调用,这意味着 reader2 的所有侦听器也将被同步调用。 但是,我们在创建 reader2 之后注册侦听器,因此它永远不会被调用。

inconsistentRead() 函数的回调行为确实是不可预测的,因为它取决于许多因素,例如调用的频率、作为参数传递的文件名以及加载文件所花费的时间。

您刚刚看到的错误在实际应用中的识别和重现可能会非常复杂。想象一下在网络服务器中使用类似的功能,可能会有多个并发请求。想象一下,看到其中一些请求挂起,没有任何明显的原因,也没有记录任何错误。这绝对是一个令人讨厌的缺陷。

npm 的创建者、前 Node.js 项目负责人 Isaac Z.Schlueter 在他的一篇博文中将使用这种不可预测的函数比作 unleashing Zalgo。

Zalgo 是一个关于不祥实体的网络传说,据信它会导致精神错乱、死亡和世界毁灭。如果你对 Zalgo 还不熟悉,我们邀请你来了解一下它到底是什么。

您可以在 nodejsdp.link/unleashing-zalgo 找到 Isaac Z.Schlueter 的原始帖子。

使用同步 API

从 Zalgo 的例子中我们可以学到的教训是,API 必须明确定义其性质:同步或异步。

对不一致的 Read() 函数的一种可能的修复方法是使其完全同步。 这是可能的,因为 Node.js 为大多数基本 I/O 操作提供了一组同步直接样式 API。 例如,我们可以使用 fs.readFileSync() 函数代替其异步对应函数。 代码将变成如下:

import { readFileSync } from 'fs'

const cache = new Map()

function consistentReadSync (filename) {
    if (cache.has(filename)) {
        return cache.get(filename)
    } else {
        const data = readFileSync(filename, 'utf8')
        cache.set(filename, data)
        return data
    }
}

可以看到整个函数也转换成了直接风格。 如果函数是同步的,则没有理由具有 CPS。 事实上,使用直接风格实现同步 API 始终是最佳实践。 这将消除对其性质的任何混淆,并且从性能角度来看也将更加高效。

模式

对于纯同步函数,始终选择直接样式。

请记住,将 API 从 CPS 更改为直接样式,或者从异步更改为同步,反之亦然,可能还需要更改使用它的所有代码的样式。 例如,在我们的例子中,我们必须完全更改 createFileReader() API 的接口并对其进行调整,以便它始终同步工作。

此外,使用同步 API 而不是异步 API 有一些注意事项:

  • 用于特定功能的同步 API 可能并不总是可用。

  • 同步 API 将阻止事件循环并暂停任何并发请求。 这将破坏 Node.js 并发模型,从而减慢整个应用程序的速度。 您将在本书后面看到这对我们的应用程序真正意味着什么。

在我们的 consistentReadSync() 函数中,阻塞事件循环的风险得到了部分缓解,因为每个文件名仅调用一次同步 I/O API,而缓存的值将用于所有后续调用。 如果我们的静态文件数量有限,那么使用 consistentReadSync() 不会对我们的事件循环产生很大的影响。 如果我们必须读取许多文件并且只读取一次,事情可能会很快发生变化。

在许多情况下,强烈建议不要在 Node.js 中使用同步 I/O,但在某些情况下,这可能是最简单、最有效的解决方案。 请始终评估您的具体用例,以便选择正确的替代方案。 例如,在启动应用程序时使用同步阻塞 API 加载配置文件是非常有意义的。

模式

在不影响应用程序处理并发异步操作的情况下,尽量少用阻塞 API。

通过延迟执行保证异步性

修复不一致的 Read() 函数的另一种选择是使其纯粹异步。 这里的技巧是安排同步回调调用在 “将来” 执行,而不是在同一事件循环周期中立即运行。 在 Node.js 中,这可以通过 process.nextTick() 实现,它在当前运行的操作完成后推迟函数的执行。 它的功能非常简单:它将回调作为参数,并将其推送到事件队列的顶部,位于任何挂起的 I/O 事件之前,并立即返回。 一旦当前运行的操作将控制权交还给事件循环,就会调用回调。

让我们应用这种技术来修复不一致的 Read() 函数,如下所示:

import { readFile } from 'fs'

const cache = new Map()

function consistentReadAsync (filename, callback) {
    if (cache.has(filename)) {
        // deferred callback invocation
        process.nextTick(() => callback(cache.get(filename)))
    } else {
        // asynchronous function
        readFile(filename, 'utf8', (err, data) => {
            cache.set(filename, data)
            callback(data)
        })
    }
}

现在,由于 process.nextTick(),我们的函数在任何情况下都保证异步调用其回调。 尝试使用它来代替不一致的 Read() 函数,并验证 Zalgo 确实已被根除。

模式

通过使用 process.nextTick() 推迟回调的执行,可以保证异步调用回调。

另一个用于推迟代码执行的 API 是 setImmediate()。 虽然其目的与 process.nextTick() 非常相似,但其语义却截然不同。 使用 process.nextTick() 延迟的回调称为微任务,它们在当前操作完成后立即执行,甚至在触发任何其他 I/O 事件之前执行。 另一方面,使用 setImmediate() 时,执行会在处理所有 I/O 事件之后的事件循环阶段中排队。 由于 process.nextTick() 在任何已调度的 I/O 之前运行,因此执行速度会更快,但在某些情况下,它也可能无限期地延迟任何 I/O 回调的运行(也称为 I/O 饥饿),例如 就像存在递归调用一样。 使用 setImmediate() 永远不会发生这种情况。

使用 setTimeout(callback, 0) 的行为与 setImmediate() 相当,但在典型情况下,使用 setImmediate() 调度的回调的执行速度比使用 setTimeout(callback, 0) 调度的回调执行得更快。 要了解原因,我们必须考虑事件循环在不同阶段执行所有回调; 对于我们正在考虑的事件类型,我们有在 I/O 回调之前执行的计时器 (setTimeout()),而 I/O 回调又在 setImmediate() 回调之前执行。 这意味着,如果我们在 setTimeout() 回调、I/O 回调中或在这两个阶段之后排队的微任务中使用 setImmediate() 对任务进行排队,那么回调将在紧随其后的阶段中执行。 我们当前所处的阶段。setTimeout() 回调必须等待事件循环的下一个周期。

当我们在本书后面分析使用延迟调用来运行同步 CPU 密集型任务时,您将更好地理解这些 API 之间的差异。

接下来,我们将探讨用于在 Node.js 中定义回调的约定。

Node.js 回调约定

在 Node.js 中,CPS API 和回调遵循一组特定约定。 这些约定适用于 Node.js 核心 API,但绝大多数用户区模块和应用程序也遵循它们。 因此,了解它们并确保在需要设计使用回调的异步 API 时遵守它们非常重要。

回调最后出现

在所有核心 Node.js 函数中,标准约定是当函数接受回调作为输入时,必须将其作为最后一个参数传递。

我们以以下 Node.js 核心 API 为例:

readFile(filename, [options], callback)

从前面函数的签名中可以看出,即使存在可选参数,回调函数也总是放在最后一个位置。这样做的原因是,如果回调函数是就地定义的,那么函数调用的可读性会更好。

任何错误总是先出现

在 CPS 中,错误会像其他类型的结果一样传播,这意味着要使用回调。在 Node.js 中,CPS 函数产生的任何错误都会作为回调的第一个参数传递,而任何实际结果都会从第二个参数开始传递。如果操作成功且没有错误,则第一个参数为 null 或 undefined。下面的代码向您展示了如何定义符合这一约定的回调:

readFile('foo.txt', 'utf8', (err, data) => {
    if(err) {
        handleError(err)
    } else {
        processData(data)
    }
})

最好的做法是始终检查是否存在错误,否则会增加调试代码和发现可能故障点的难度。另一个需要考虑的重要约定是,错误必须始终属于 Error 类型。这意味着绝不能将简单的字符串或数字作为错误对象传递。

传播错误

在同步、直接风格函数中传播错误是通过众所周知的 throw 语句完成的,这会导致错误在调用堆栈中向上跳转,直到被捕获。

然而,在异步 CPS 中,只需将错误传递到链中的下一个回调即可完成正确的错误传播。 典型的模式如下所示:

import { readFile } from 'fs'

function readJSON (filename, callback) {

    readFile(filename, 'utf8', (err, data) => {
        let parsed
        if (err) {
            // propagate the error and exit the current function
            return callback(err)
        }

        try {
            // parse the file contents
            parsed = JSON.parse(data)
        } catch (err) {
            // catch parsing errors
            return callback(err)
        }
        // no errors, propagate just the data
        callback(null, parsed)
    })
}

请注意我们如何传播 readFile() 操作收到的错误。 我们不会扔掉或退回它; 相反,我们只是使用回调,就好像它是任何其他结果一样。 另外,请注意我们如何使用 try…​catch 语句来捕获 JSON.parse() 抛出的任何错误,这是一个同步函数,因此使用传统的 throw 指令将错误传播给调用者。 最后,如果一切顺利,则会使用 null 作为第一个参数来调用回调,以指示没有错误。

值得注意的是我们如何避免从 try 块内调用回调(callback)。 这是因为这样做会捕获回调本身执行时抛出的任何错误,这通常不是我们想要的。

未捕获异常

有时,可能会发生错误被抛出但未在异步函数的回调中捕获的情况。 例如,如果我们忘记在之前定义的 readJSON() 函数中使用 try…​catch 语句包围 JSON.parse(),则可能会发生这种情况。 在异步回调中抛出错误会导致错误跳转到事件循环,因此它永远不会传播到下一个回调。 在 Node.js 中,这是一种不可恢复的状态,应用程序将简单地以 非零退出代码退出,并将堆栈跟踪打印到 stderr 接口。

为了演示这一点,让我们尝试删除 JSON 周围的 try…​catch 块。 parse() 来自我们之前定义的 readJSON() 函数:

function readJSONThrows (filename, callback) {
    readFile(filename, 'utf8', (err, data) => {
        if (err) {
            return callback(err)
        }
        callback(null, JSON.parse(data))
    })
}

现在,在我们刚刚定义的函数中,无法捕获来自 JSON.parse() 的最终异常。 如果我们尝试使用以下代码解析无效的 JSON 文件:

readJSONThrows('invalid_json.json', (err) => console.error(err))

这将导致应用程序突然终止,并在控制台上打印类似于以下内容的堆栈跟踪:

SyntaxError: Unexpected token h in JSON at position 1
at JSON.parse (<anonymous>)
    at file:///.../03-callbacks-and-events/08-uncaught-errors/index.
    js:8:25
    at FSReqCallback.readFileAfterClose [as oncomplete] (internal/fs/
    read_file_context.js:61:3)

现在,如果您查看前面的堆栈跟踪,您将看到它从内置 fs 模块内部开始,并且恰好从本机 API 完成读取并将其结果返回到 fs.readFile() 函数,通过事件循环。 这清楚地表明异常从我们的回调开始,向上传递到调用堆栈,然后直接进入事件循环,最终被捕获并抛出到控制台。

这也意味着用 try…​catch 块包装 readJSONThrows() 的调用将不起作用,因为该块运行的堆栈与调用回调的堆栈不同。 以下代码显示了刚刚描述的反模式:

try {
    readJSONThrows('invalid_json.json', (err) => console.error(err))
} catch (err) {
    console.log('This will NOT catch the JSON parsing exception')
}

前面的 catch 语句永远不会收到 JSON 解析错误,因为它会沿着引发错误的调用堆栈向上移动,即在事件循环中,而不是在触发异步操作的函数中。

如前所述,应用程序将在异常到达事件循环时中止。 但是,我们仍然有机会在应用程序终止之前执行一些清理或日志记录。 事实上,当发生这种情况时,Node.js 将在退出进程之前发出一个名为 uncaughtException 的特殊事件。 以下代码显示了示例用例:

process.on('uncaughtException', (err) => {
    console.error(`This will catch at last the JSON parsing exception: ${err.message}`)
    // Terminates the application with 1 (error) as exit code.
    // Without the following line, the application would continue
    process.exit(1)
})

重要的是要明白,未捕获的异常会使应用程序处于无法保证一致的状态,从而导致无法预见的问题。例如,可能仍有未完成的 I/O 请求在运行,或者闭包可能变得不一致。因此,建议在收到未捕获异常后,不要让应用程序继续运行,尤其是在生产环境中。相反,进程应立即退出,可选择在运行一些必要的清理任务后退出,最好由一个监督进程重新启动应用程序。这也被称为 “快速失败” 方法,是 Node.js 推荐的做法。

我们将在第 12 章 “可扩展性和架构模式” 中更详细地讨论 supervisors。

我们对回调模式的简单介绍到此结束。 现在,是时候了解观察者模式了,它是 Node.js 等事件驱动平台的另一个关键组件。