异步请求批处理和缓存

在高负载应用程序中,缓存起着至关重要的作用,它在网络上几乎随处可见,从网页、图像和样式表等静态资源到数据库查询结果等纯数据。 在本节中,我们将了解缓存如何应用于异步操作以及如何将高请求吞吐量转化为我们的优势。

什么是异步请求批处理?

处理异步操作时,最基本的缓存级别可以通过对同一 API 的一组调用进行批处理来实现。 这个想法非常简单:如果我们在还有另一个待处理的异步函数时调用一个异步函数,我们可以搭载已经在运行的操作,而不是创建一个全新的请求。 看一下下图:

image 2024 05 08 09 53 53 016
Figure 1. 图 11.1:两个没有批处理的异步请求

上图显示两个客户端使用完全相同的输入调用相同的异步操作。 当然,描述这种情况的自然方式是两个客户端启动两个单独的操作,这两个操作将在两个不同的时刻完成。

现在,考虑以下场景:

image 2024 05 08 09 54 48 589
Figure 2. 图 11.2:两个异步请求的批处理

图 11.2 向我们展示了如何对两个相同的请求(使用相同的输入调用相同的 API)进行批处理,或者换句话说,将其附加到相同的正在运行的操作中。 通过这样做,当操作完成时,即使异步操作实际上只执行一次,两个客户端都会收到通知。 这代表了一种简单但极其强大的方法来优化应用程序的负载,同时不必处理更复杂的缓存机制,这些机制通常需要适当的内存管理和失效策略。

优化异步请求缓存

如果操作足够快或者匹配的请求分散在较长的时间段内,请求批处理的效率就会降低。 此外,大多数时候,我们可以放心地假设两个相同的 API 调用的结果不会经常更改,因此简单的请求批处理不会提供最佳性能。 在所有这些情况下,减少应用程序负载并提高其响应能力的最佳选择无疑是更积极的缓存机制。

这个想法很简单:请求完成后,我们将其结果存储在缓存中,缓存可以是内存中的变量或专用缓存服务器(例如 Redis)中的项目。 因此,下次调用 API 时,可以立即从缓存中检索结果,而不是产生另一个请求。

缓存的想法对于经验丰富的开发人员来说应该不是什么新鲜事,但这种技术在异步编程中的不同之处在于,它应该与请求批处理结合起来才能达到最佳效果。 这样做的原因是,在未设置缓存的情况下,多个请求可能会并发运行,并且当这些请求完成时,缓存将被设置多次。

基于这些假设,我们可以将组合请求批处理和缓存模式说明如下:

image 2024 05 08 09 56 25 245
Figure 3. 图 11.3:组合批处理和缓存

上图显示了最佳异步缓存算法的两个阶段:

  • 第一个阶段与批处理模式完全相同。 未设置缓存时收到的任何请求都将被一起批处理。 当请求完成时,缓存被设置一次。

  • 当缓存最终设置完毕后,任何后续请求都将直接从缓存中得到服务。

另一个需要考虑的关键细节是 Zalgo 反模式(正如我们在第 3 章回调和事件中看到的那样)。 由于我们正在处理异步 API,因此我们必须确保始终异步返回缓存值,即使访问缓存仅涉及同步操作,例如从内存变量检索缓存值的情况。

没有缓存或批处理的 API 服务器

在我们开始深入研究这一新挑战之前,让我们实现一个小型演示服务器,我们将使用它作为参考来衡量我们将要实现的各种技术的影响。

让我们考虑一个管理电子商务公司销售的 API 服务器。 特别是,我们希望向服务器查询特定类型商品的所有交易总和。 为此,我们将通过 level npm 包 (nodejsdp.link/level) 使用 LevelUP 数据库。 我们将使用的数据模型是存储在销售子级别(数据库的一个子部分)中的简单交易列表,其组织格式如下:

transactionId {amount, product}

键由 transactionId 表示,值是一个 JSON 对象,其中包含销售金额(amount)和产品类型(product)。

要处理的数据非常基本,因此让我们对可用于实验的数据库实现一个简单的查询。 假设我们想要获取特定产品的总销售额。 该例程如下所示(文件 totalSales.js):

import level from 'level'
import sublevel from 'subleveldown'

const db = level('example-db')
const salesDb = sublevel(db, 'sales', {valueEncoding: 'json'})

export async function totalSales(product) {
    const now = Date.now()
    let sum = 0
    for await (const transaction of salesDb.createValueStream()) {
        if (!product || transaction.product === product) {
            sum += transaction.amount
        }
    }
    console.log(`totalSales() took: ${Date.now() - now}ms`)
    return sum
}

TotalSales() 函数迭代销售子级别的所有交易并计算特定产品的金额总和。 该算法故意变慢,因为我们想稍后强调批处理和缓存的效果。 在现实世界的应用程序中,我们可以使用索引来按产品查询交易,或者更好的是,我们可以使用增量映射/归约算法来连续计算每个产品的总和

我们现在可以通过一个简单的 HTTP 服务器(server.js 文件)公开 totalSales() API:

import {createServer} from 'http'
import {totalSales} from './totalSales.js'

createServer(async (req, res) => {
    const url = new URL(req.url, 'http://localhost')
    const product = url.searchParams.get('product')
    console.log(`Processing query: ${url.search}`)
    const sum = await totalSales(product)
    res.setHeader('Content-Type', 'application/json')
    res.writeHead(200)
    res.end(JSON.stringify({
        product,
        sum
    }))
}).listen(8000, () => console.log('Server started'))

在第一次启动服务器之前,我们需要使用一些示例数据填充数据库。 我们可以使用 populateDb.js 脚本来完成此操作,该脚本可以在本书的代码存储库中专用于本节的文件夹中找到。 此脚本在数据库中创建 100,000 个随机销售交易,以便我们的查询花费一些时间来处理数据:

node populateDb.js

好的! 现在,一切准备就绪。 让我们启动服务器:

node server.js

要查询服务器,您只需使用浏览器导航到以下 URL:

http://localhost:8000?product=book

然而,为了更好地了解服务器的性能,我们将需要多个请求。 因此,我们将使用一个名为 loadTest.js 的小脚本,它以 200 毫秒的间隔发送 20 个请求。 该脚本可以在本书的代码存储库中找到,并且它已经配置为连接到服务器的本地 URL,因此,要运行它,只需执行以下命令:

node loadTest.js

我们将看到这 20 个请求需要一段时间才能完成。 记下测试的总执行时间。 现在,我们将应用优化并衡量可以节省多少时间。 我们将首先利用 Promise 的属性来实现批处理和缓存。

使用 Promise 进行批处理和缓存

Promise 是实现异步批处理和请求缓存的绝佳工具。 让我们看看为什么。 如果我们回想一下在第 5 章 “带有 Promises 和 Async/Await 的异步控制流模式” 中学到的有关 Promise 的知识,在这种情况下,可以利用两个属性来发挥我们的优势:

  • 多个 then() 监听器可以附加到同一个 promise。

  • then() 侦听器保证被调用(仅一次),并且即使它是在 promise 已解决之后附加的,它也能工作。 此外,then() 保证始终被异步调用。

简而言之,第一个属性正是我们批处理请求所需的,而第二个属性意味着 Promise 已经是解析值的缓存,并提供了一种以一致、异步的方式返回缓存值的自然机制。 换句话说,这意味着批处理和缓存通过 Promise 变得极其简单和简洁。

总销售 Web 服务器中的批处理请求

现在让我们在 TotalSales API 之上添加一个批处理层。 我们要使用的模式非常简单:如果调用 API 时有另一个相同的请求待处理,我们将等待该请求完成,而不是启动新的请求。 正如我们将看到的,这可以通过承诺轻松实现。 事实上,我们所要做的就是将承诺保存在映射中,每次启动新请求时将其与指定的请求参数(在我们的例子中为产品类型)相关联。 然后,在每个后续请求中,我们都会检查是否已存在对指定产品的承诺,如果有,我们只需将其返回; 否则,我们将发起新的请求。

现在,让我们看看如何将其转换为代码。 让我们创建一个名为totalSalesBatch.js 的新模块。 在这里,我们将在原始 TotalSales() API 之上实现一个批处理层:

import {totalSales as totalSalesRaw} from './totalSales.js'

const runningRequests = new Map()

export function totalSales(product) {
    if (runningRequests.has(product)) { // (1)
        console.log('Batching')
        return runningRequests.get(product)
    }
    const resultPromise = totalSalesRaw(product) // (2)
    runningRequests.set(product, resultPromise)
    resultPromise.finally(() => {
        runningRequests.delete(product)
    })
    return resultPromise
}

TotalSalesBatch 模块的 totalSales() 函数是原始 totalSales() API 的代理,其工作原理如下:

  1. 如果给定产品的 promise 已经存在,我们只需返回它。 这是我们搭载已经运行的请求的地方。

  2. 如果没有针对给定产品运行的请求,我们将执行原始的 TotalSales() 函数,并将生成的 Promise 保存到 runningRequests 映射中。 接下来,我们确保请求完成后立即从 runningRequests 映射中删除相同的 Promise。

新的 totalSales() 函数的行为与原来的 totalSales() API 的行为相同,不同之处在于,现在使用相同输入对 API 的多次调用是批处理的,从而节省了我们的时间和资源。

想知道与原始的、非批处理版本的 totalSales() API 相比,性能有何改进? 然后,我们将 HTTP 服务器使用的 TotalSales 模块替换为我们刚刚创建的模块(app.js 文件):

// import { totalSales } from './totalSales.js'
import { totalSales } from './totalSalesBatch.js'

createServer(async (req, res) => {
// ...

如果我们现在尝试再次启动服务器并对其运行负载测试,我们将看到的第一件事是请求批量返回。 这是我们刚刚实施的配方的效果,它是其工作原理的一个很好的实际演示。

除此之外,我们还应该观察到执行测试的总时间显着减少。 它应该比针对普通 TotalSales() API 执行的原始测试至少快四倍!

这一结果证实了我们只需应用一个简单的批处理层即可获得巨大的性能提升,而无需管理成熟的缓存的所有复杂性,更重要的是,无需担心失效策略。

请求批处理模式在高负载应用程序和慢速 API 中发挥其最大潜力。 这是因为正是在这些情况下,我们可以将大量请求批量处理在一起。

现在让我们看看如何使用我们刚刚探索的技术的细微变化来实现批处理和缓存。

在总销售 Web 服务器中缓存请求

多亏了 Promise,向我们的批处理 API 添加缓存层非常简单。 我们所要做的就是在请求映射中留下承诺,即使在请求完成之后也是如此。

让我们直接实现 totalSalesCache.js 模块:

import {totalSales as totalSalesRaw} from './totalSales.js'

const CACHE_TTL = 30 * 1000 // 30 seconds TTL
const cache = new Map()

export function totalSales(product) {
    if (cache.has(product)) {
        console.log('Cache hit')
        return cache.get(product)
    }
    const resultPromise = totalSalesRaw(product)
    cache.set(product, resultPromise)
    resultPromise.then(() => {
        setTimeout(() => {
            cache.delete(product)
        }, CACHE_TTL)
    }, err => {
        cache.delete(product)
        throw err
    })
    return resultPromise
}

启用缓存的相关代码已突出显示。 我们所要做的就是在请求完成后一定时间(CACHE_TTL)后从缓存中删除承诺,或者如果请求失败则立即从缓存中删除承诺。 这是一种非常基本的缓存失效技术,但它非常适合我们的演示。

现在,我们准备尝试刚刚创建的 totalSales() 缓存包装器。 为此,我们只需要更新 app.js 模块,如下所示:

// import { totalSales } from './totalSales.js'
// import { totalSales } from './totalSalesBatch.js'
import { totalSales } from './totalSalesCache.js'

createServer(async (req, res) => {
// ...

现在,可以再次启动服务器并使用 loadTest.js 脚本进行分析,就像我们在前面的示例中所做的那样。 使用默认测试参数,与简单批处理相比,我们应该看到执行时间减少了 10%。 当然,这很大程度上取决于很多因素; 例如,收到的请求数量以及一个请求与另一个请求之间的延迟。 当请求数量较多且持续时间较长时,使用缓存相对于批处理的优势将更加明显。

关于实现缓存机制的注意事项

我们必须记住,在现实应用中,我们可能希望使用更先进的缓存失效技术和存储机制。 出于以下原因,这是必要的:

  • 大量缓存值很容易消耗大量内存。 在这种情况下,可以应用最近最少使用(LRU)或先进先出(FIFO)策略来维持恒定的存储器利用率。

  • 当应用程序分布在多个进程中时,将缓存保留在内存中可能会在每个服务器实例上产生不同的结果。 如果这对于我们正在实现的特定应用程序来说是不希望的,那么解决方案是使用共享存储来存储缓存。 这也比简单的内存解决方案性能更高,因为缓存在多个实例之间共享。 流行的缓存解决方案包括 Redis (nodejsdp.link/redis) 和 Memcached (nodejsdp.link/memcached)。

  • 与定时过期相反,手动缓存失效(例如,当相关的非缓存值发生更改时)可以启用更长久的缓存,同时提供更多最新数据,但是, 当然,管理起来会复杂得多。 我们不要忘记 Phil Karlton(Netscape、Silicon Graphics 等公司的首席工程师)的名言:“计算机科学中只有两件事很难:缓存失效和命名。”

至此,我们结束了关于请求批处理和缓存的本节。 接下来,我们将学习如何解决一个棘手的问题:取消异步操作。