处理异步初始化的组件

Node.js 核心模块和许多 npm 包中存在同步 API 的原因之一是它们可以方便地用于实现初始化任务。 对于简单的程序,在初始化时使用同步 API 可以大大简化工作,并且与其使用相关的缺点仍然受到限制,因为它们仅使用一次,即在程序或特定组件初始化时使用。

不幸的是,这并不总是可能的。 同步 API 可能并不总是可用,特别是对于在初始化阶段使用网络来执行握手协议或检索配置参数等操作的组件。 许多数据库驱动程序和中间件系统(例如消息队列)的客户端都是这种情况。

异步初始化组件的问题

让我们考虑一个名为 db 的模块用于与远程数据库交互的示例。 只有在与数据库服务器的连接和握手成功完成后,db 模块才会接受 API 请求。 因此,在初始化阶段完成之前,不能发送任何查询或其他命令。 以下是此类示例模块(db.js 文件)的代码:

import {EventEmitter} from 'events'

class DB extends EventEmitter {
    connected = false

    connect() {
        // simulate the delay of the connection
        setTimeout(() => {
            this.connected = true
            this.emit('connected')
        }, 500)
    }

    async query(queryString) {
        if (!this.connected) {
            throw new Error('Not connected yet')
        }
        console.log(`Query executed: ${queryString}`)
    }
}

export const db = new DB()

这是异步初始化组件的典型示例。 在这些假设下,我们通常有两种快速简单的解决方案来解决这个问题,我们可以称之为本地初始化检查和延迟启动。 让我们更详细地分析它们。

本地初始化检查

第一个解决方案确保模块在调用其任何 API 之前进行初始化; 否则,我们等待它的初始化。 每次我们想要在异步模块上调用操作时都必须执行此检查:

import {once} from 'events'
import {db} from './db.js'

db.connect()

async function updateLastAccess() {
    if (!db.connected) {
        await once(db, 'connected')
    }
    await db.query(`INSERT (${Date.now()}) INTO "LastAccesses"`)
}

updateLastAccess()
setTimeout(() => {
    updateLastAccess()
}, 600)

正如我们已经预料到的,任何时候我们想要在 db 组件上调用 query() 方法,我们都必须检查模块是否已初始化; 否则,我们通过监听 “connected” 事件来等待其初始化。 该技术的一种变体在 query() 方法本身内部执行检查,这将样板代码的负担从消费者转移到了服务的提供者。

延迟启动

针对异步初始化组件问题的第二个快速但肮脏的解决方案涉及延迟依赖于异步初始化组件的任何代码的执行,直到该组件完成其初始化例程。 我们可以在下面的代码片段中看到这种技术的示例:

import {db} from './db.js'
import {once} from 'events'

async function initialize() {
    db.connect()
    await once(db, 'connected')
}

async function updateLastAccess() {
    await db.query(`INSERT (${Date.now()}) INTO "LastAccesses"`)
}

initialize()
    .then(() => {
        updateLastAccess()
        setTimeout(() => {
            updateLastAccess()
        }, 600)
    })

从前面的代码可以看出,我们首先等待初始化完成,然后继续执行任何使用 db 对象的例程。

这种技术的主要缺点是它要求我们提前知道哪些组件将使用异步初始化的组件,这使得我们的代码脆弱并且容易出错。 解决此问题的一种方法是延迟整个应用程序的启动,直到所有异步服务都初始化完毕。 这样做的优点是简单有效; 但是,它可能会显着增加应用程序的整体启动时间,而且它不会考虑必须重新初始化异步初始化组件的情况。

正如我们将在下一节中看到的,还有第三种替代方案,它允许我们透明且有效地延迟每个操作,直到异步初始化步骤完成。

预初始化队列

确保仅在组件初始化后才调用组件服务的另一个方法涉及使用队列和命令模式。 这个想法是在组件尚未初始化时将接收到的方法调用(仅那些需要初始化组件的方法调用)排队,然后在所有初始化步骤完成后立即执行它们。

让我们看看如何将此技术应用于我们的示例数据库组件:

import {EventEmitter} from 'events'

class DB extends EventEmitter {
    connected = false
    commandsQueue = []

    async query(queryString) {
        if (!this.connected) {
            console.log(`Request queued: ${queryString}`)
            return new Promise((resolve, reject) => { // (1)
                const command = () => {
                    this.query(queryString)
                        .then(resolve, reject)
                }
                this.commandsQueue.push(command)
            })
        }
        console.log(`Query executed: ${queryString}`)
    }

    connect() {
        // simulate the delay of the connection
        setTimeout(() => {
            this.connected = true
            this.emit('connected')
            this.commandsQueue.forEach(command => command()) // (2)
            this.commandsQueue = []
        }, 500)
    }
}

export const db = new DB()

正如我们已经提到的,这里描述的技术由两部分组成:

  1. 如果组件尚未初始化(在我们的例子中,即连接属性为 false),我们根据当前调用收到的参数创建一个命令 并将其推送到 commandsQueue 数组中。 当命令执行时,它将再次运行原始的 query() 方法,并将结果转发给我们返回给调用者的 Promise。

  2. 当组件的初始化完成时(在我们的例子中,这意味着与数据库服务器的连接已建立),我们将遍历 commandsQueue,执行之前排队的所有命令。

使用我们刚刚实现的 DB 类,在调用其方法之前无需检查组件是否已初始化。 事实上,所有逻辑都嵌入在组件本身中,任何使用者都可以透明地使用它,而不必担心其初始化状态。

我们还可以更进一步,尝试减少刚刚创建的 DB 类的样板文件,同时提高其模块化性。 我们可以通过应用状态模式来实现这一点,我们在第 9 章 “行为设计模式” 中了解到该模式有两种状态:

  • 第一个状态实现了组件需要初始化的所有方法,并且只有在初始化成功时才被激活。 这些方法中的每一个都实现自己的业务逻辑,而不用担心数据库组件的初始化状态。

  • 第二个状态在初始化完成之前激活,它实现与第一个状态相同的方法,但它们在这里的唯一作用是添加一个 使用传递给调用的参数将新命令添加到队列中。

让我们看看如何将我们刚刚描述的结构应用到我们的数据库组件中。 首先,我们创建 InitializedState,它实现组件的实际业务逻辑:

class InitializedState {
    async query (queryString) {
        console.log(`Query executed: ${queryString}`)
    }
}

正如我们所看到的,我们需要在 InitializedState 类中实现的唯一方法是 query() 方法,该方法在收到新查询时会向控制台打印一条消息。

接下来,我们实现 QueuingState,这是我们配方的核心。 该状态实现了排队逻辑:

const METHODS_REQUIRING_CONNECTION = ['query']
const deactivate = Symbol('deactivate')

class QueuingState {
    constructor(db) {
        this.db = db
        this.commandsQueue = []
        METHODS_REQUIRING_CONNECTION.forEach(methodName => {
            this[methodName] = function (...args) {
                console.log('Command queued:', methodName, args)
                return new Promise((resolve, reject) => {
                    const command = () => {
                        db[methodName](...args)
                            .then(resolve, reject)
                    }
                    this.commandsQueue.push(command)
                })
            }
        })
    }

    [deactivate]() {
        this.commandsQueue.forEach(command => command())
        this.commandsQueue = []
    }
}

有趣的是,QueuingState 主要是如何在创建时动态构建的。 对于每个需要活动连接的方法,我们为当前实例创建一个新方法,该方法将代表函数调用的新命令排队。 当稍后执行该命令时,当建立连接时,数据库实例上的方法调用结果将转发给调用者(通过返回的 Promise)。

这个状态类的另一个重要部分是 [deactivate]()。 当状态停用(即组件初始化时)时调用此方法,并执行队列中的所有命令。 请注意我们如何使用 Symbol 来命名该方法。

如果我们向状态添加更多方法,这将避免将来发生任何名称冲突(例如,如果我们需要装饰 DB 类的假设 deactivate() 方法怎么办?)。

现在,是时候使用我们刚刚描述的两种状态重新实现 DB 类了:

class DB extends EventEmitter {
    constructor() {
        super()
        this.state = new QueuingState(this) // (1)
    }

    async query(queryString) {
        return this.state.query(queryString) // (2)
    }

    connect() {
        // simulate the delay of the connection
        setTimeout(() => {
            this.connected = true
            this.emit('connected')
            const oldState = this.state // (3)
            this.state = new InitializedState(this)
            oldState[deactivate] && oldState[deactivate]()
        }, 500)
    }
}

export const db = new DB()

让我们进一步分析一下新的 DB 类最重要的部分:

  1. 在构造函数中,我们初始化实例的当前状态。 它将是 QueuingState,因为组件的异步初始化尚未完成。

  2. 我们类中实现某些(存根)业务逻辑的唯一方法是 query() 方法。 在这里,我们所要做的就是在当前活动状态上调用同名方法。

  3. 最后,当我们建立与数据库的连接(初始化完成)时,我们将当前状态切换到 InitializedState,并停用旧状态。 正如我们之前所看到的,停用 QueuedState 的效果是所有已排队的命令现在都会被执行。

我们可以立即看到这种方法如何允许我们减少样板文件,同时创建一个纯粹的业务逻辑类(InitializedState),而无需任何重复的初始化检查。

仅当我们可以修改异步初始化组件的代码时,我们刚刚看到的方法才有效。 在所有无法对组件进行修改的情况下,我们将需要创建一个包装器或代理,但该技术与我们在这里看到的技术基本相似。

in the wild

我们刚刚介绍的模式被许多数据库驱动程序和 ORM 库使用。 最值得注意的是 Mongoose (nodejsdp.link/mongoose),它是 MongoDB 的 ORM。 使用 Mongoose,无需等待数据库连接打开即可发送查询。 这是因为每个操作都会排队,然后在与数据库的连接完全建立后执行,正如我们在本节中所描述的那样。 对于任何想要提供良好的开发人员体验 (DX) 的 API 来说,这显然是必须的。

看一下 Mongoose 的代码,看看如何代理原生驱动中的每个方法来添加预初始化队列。 这也演示了实现我们在本节中介绍的方法的另一种方法。 您可以在 nodejsdp.link/mongoose-init-queue 找到相关的代码片段。

类似地,pg 包 (nodejsdp.link/pg) 是 PostgreSQL 数据库的客户端,它利用预初始化队列,但方式略有不同。 pg 将每个查询排队,无论数据库的初始化状态如何,然后立即尝试执行队列中的所有命令。 查看 nodejsdp.link/pg-queue 中的相关代码行。