Async/await
正如我们刚才所看到的,承诺比回调领先了一个巨大的飞跃。 它们使我们能够编写干净且可读的异步代码,并提供一组只有在使用基于回调的异步代码时才能通过样板代码实现的保护措施。 然而,在编写顺序异步代码时,Promise 仍然不是最优的。 Promise 链确实比回调地狱要好得多,但是我们仍然必须调用 then() 并为链中的每个任务创建一个新函数。 对于日常编程中绝对最常用的控制流来说,这仍然太多了。 JavaScript 需要一种正确的方法来处理无处不在的异步顺序执行流,而答案是随着 ECMAScript 标准中异步函数(async functions)和等待表达式(await expression)(简称 async/await)的引入而出现的。
async/await 二分法允许我们编写看起来在每个异步操作处阻塞的函数,在继续执行以下语句之前等待结果。 正如我们将看到的,任何使用 async/await 的异步代码都具有与传统同步代码相当的可读性。
如今,async/await 是处理 Node.js 和 JavaScript 中异步代码的推荐结构。 然而,async/await 并不能取代我们迄今为止所学到的有关异步控制流模式的所有内容; 相反,正如我们将看到的,async/await 很大程度上依赖于 Promise。
异步函数和 await 表达式
异步函数是一种特殊类型的函数,可以使用 await 表达式来 “暂停” 给定 Promise 的执行,直到它解析为止。 让我们考虑一个简单的示例,并使用我们在创建 Promise 小节中实现的 delay() 函数。 Delay() 返回的 Promise 将当前日期解析为给定毫秒数之后的值。 让我们将此函数与 async/await 对一起使用:
async function playingWithDelays () {
console.log('Delaying...', new Date())
const dateAfterOneSecond = await delay(1000)
console.log(dateAfterOneSecond)
const dateAfterThreeSeconds = await delay(3000)
console.log(dateAfterThreeSeconds)
return 'done'
}
正如我们从前面的函数中看到的,async/await 看起来就像魔术一样。 该代码甚至看起来不包含任何异步操作。 不过,请不要误会; 该函数不同步运行(它们被称为异步函数是有原因的!)。 在每个 await 表达式中,函数的执行被暂停,其状态被保存,并且控制权返回到事件循环。 一旦等待的 Promise 解析,控制权就会交还给 async 函数,并返回 Promise 的履行值。
await 表达式适用于任何值,而不仅仅是承诺。 如果提供了 Promise 以外的值,则其行为类似于等待首先传递给 Promise.resolve() 的值。 |
现在让我们看看如何调用新的异步(async)函数:
playingWithDelays()
.then(result => {
console.log(`After 4 seconds: ${result}`)
})
从前面的代码可以清楚地看出,异步函数可以像任何其他函数一样被调用。 然而,最细心的人可能已经发现了异步函数的另一个重要属性:它们总是返回一个 Promise。 就像异步函数的返回值被传递给 Promise.resolve() 然后返回给调用者一样。
与任何其他异步操作一样,调用异步函数是即时的。 换句话说,异步函数同步返回 Promise。 该 Promise 最终将根据函数产生的结果或错误进行解决。 |
从第一次遇到 async/await 开始,我们就可以看到 Promise 在我们的讨论中仍然占主导地位。 事实上,我们可以将 async/await 视为一种语法糖,以便更简单地使用 Promise。 正如我们将看到的,所有具有 async/await 的异步控制流模式都使用 Promise 及其 API 来处理大多数繁重的操作。
使用 async/await 进行错误处理
Async/await 不仅提高了标准条件下异步代码的可读性,而且在处理错误时也有帮助。 事实上,async/await 的最大好处之一是能够标准化 try…catch 块的行为,使其能够与同步抛出和异步 Promise rejections 无缝协作。 让我们用一个例子来证明这一点。
统一的 try…catch 体验
让我们定义一个函数,该函数返回一个 Promise,该 Promise 在给定的毫秒数后拒绝并出现错误。 这与我们已经非常熟悉的 delay() 函数非常相似:
function delayError (milliseconds) {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Error after ${milliseconds}ms`))
}, milliseconds)
})
}
接下来,让我们实现一个异步函数,它可以同步抛出错误或等待将拒绝的 Promise。 此函数演示了如何通过同一个 catch 块捕获同步抛出和 Promise 拒绝:
async function playingWithErrors (throwSyncError) {
try {
if (throwSyncError) {
throw new Error('This is a synchronous error')
}
await delayError(1000)
} catch (err) {
console.error(`We have an error: ${err.message}`)
} finally {
console.log('Done')
}
}
现在,像这样调用该函数:
playingWithErrors(true)
将打印到控制台以下内容:
We have an error: This is a synchronous error
Done
使用 false 作为输入调用函数时,如下所示:
playingWithErrors(false)
将产生以下输出:
We have an error: Error after 1000ms
Done
如果我们还记得第 4 章 “带回调的异步控制流模式” 中如何处理错误,我们一定会欣赏 Promise 和 async/await 带来的巨大改进。 现在,错误处理就应该是这样的:简单、可读,最重要的是,支持同步和异步错误。
"return" 与 "return await" 陷阱
使用 async/await 处理错误时,一种常见的反模式是返回一个拒绝调用者的 Promise,并期望返回该 Promise 的函数的本地 try…catch 块捕获该错误。
例如,考虑以下代码:
async function errorNotCaught () {
try {
return delayError(1000)
} catch (err) {
console.error('Error caught by the async function: ' +
err.message)
}
}
errorNotCaught()
.catch(err => console.error('Error caught by the caller: ' +
err.message))
DelayError() 返回(return)的 Promise 不会在本地等待,这意味着它会按原样返回给调用者。 因此,本地 catch 块永远不会被调用。 事实上,前面的代码会输出:
Error caught by the caller: Error after 1000ms
如果我们的目的是在本地捕获异步操作生成的任何错误,该操作产生我们想要返回给调用者的值,那么在将值返回给调用者之前,我们必须在该 Promise 上使用 await 表达式。 下面的代码演示了这一点:
async function errorCaught () {
try {
return await delayError(1000)
} catch (err) {
console.error('Error caught by the async function: ' +
err.message) }
}
errorCaught()
.catch(err => console.error('Error caught by the caller: ' +
err.message))
我们所做的就是在 return 关键字之后添加一个 await。 这足以使异步函数在本地 “处理” Promise,因此也可以在本地捕获任何拒绝。 作为确认,当我们运行前面的代码时,我们应该看到以下输出:
Error caught by the async function: Error after 1000ms
顺序执行和迭代
我们对 async/await 控制流模式的探索从顺序执行和迭代开始。 我们已经多次提到,async/await 的核心优势在于它能够使异步串行执行易于编写且易于读取。 这在我们迄今为止编写的所有代码示例中已经很明显; 然而,现在我们将开始转换网络蜘蛛版本 2,这一点将变得更加明显。 Async/await 非常易于使用和理解,因此实际上没有任何模式可供研究。 我们将直接进入代码,没有任何序言。
那么,让我们从网络蜘蛛的 download() 函数开始; 这是 async/await 的样子:
async function download (url, filename) {
console.log(`Downloading ${url}`)
const { text: content } = await superagent.get(url)
await mkdirpPromises(dirname(filename))
await fsPromises.writeFile(filename, content)
console.log(`Downloaded and saved: ${url}`)
return content
}
让我们欣赏一下 download() 函数变得多么简单和紧凑。 让我们考虑一下,相同的功能是通过两个不同函数中的回调实现的,总共使用了 19 行代码。 现在我们只有七个。 另外,代码现在完全扁平化,根本没有嵌套。 这告诉我们很多关于 async/await 对我们的代码产生的巨大积极影响。
现在,让我们看看如何使用 async/await 异步迭代数组。 这在 SpiderLinks() 函数中得到了例证:
async function spiderLinks (currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
for (const link of links) {
await spider(link, nesting - 1)
}
}
即使在这里也没有可学习的模式。 我们只是对链接列表进行简单的迭代,对于每个项目,我们等待 Spider() 返回的 Promise。
下一个代码片段显示了使用 async/await 实现的 Spider() 函数。 这里需要注意的方面是如何仅使用 try…catch 语句轻松处理错误,使所有内容更易于阅读:
export async function spider (url, nesting) {
const filename = urlToFilename(url)
let content
try {
content = await fsPromises.readFile(filename, 'utf8')
} catch (err) {
if (err.code !== 'ENOENT') {
throw err
}
content = await download(url, filename)
}
return spiderLinks(url, content, nesting)
}
通过 spider() 函数,我们已经完成了网络蜘蛛应用程序到 async/await 的转换。 正如您所看到的,这是一个相当顺利的过程,但结果却令人印象深刻。
反模式 – 使用 async/await 和 Array.forEach 进行串行执行
值得一提的是,有一个常见的反模式,开发人员会尝试使用 Array.forEach() 或 Array.map() 来通过 async/await 实现顺序异步迭代,当然,这不会按预期工作。
要了解原因,让我们看一下 SpiderLinks() 函数中异步迭代的以下替代实现(这是错误的!):
links.forEach(async function iteration(link) {
await spider(link, nesting - 1)
})
在前面的代码中,链接数组的每个元素都会调用一次迭代函数。然后,在迭代函数中,我们对 spider() 返回的 Promise 使用了 await 表达式。但 forEach() 忽略了迭代函数返回的 Promise。结果是,所有 spider() 函数都在事件循环的同一轮中被调用,这意味着它们是并行启动的,并且在调用 forEach() 后立即继续执行,而无需等待所有 spider() 操作完成。
并行执行
使用 async/await 并行运行一组任务主要有两种方法; 一种纯粹使用 await 表达式,另一种依赖 Promise.all()。 它们的实现都非常简单; 但是,请注意,依赖 Promise.all() 的方法是推荐(也是最佳)使用的方法。
让我们看一个两者的例子。 让我们考虑一下网络蜘蛛的 SpiderLinks() 函数。 如果我们想纯粹使用 await 表达式来实现无限并行异步执行流程,我们可以使用如下代码:
async function spiderLinks (currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
for (const promise of promises) {
await promise
}
}
就是这样,非常简单。在前面的代码中,我们首先并行启动所有 spider() 任务,用 map() 收集它们的 promise。然后,我们循环并 await 每一个 promise。
起初,这看起来既整洁又实用;但是,它有一个小小的意外效果。如果数组中的一个 Promise 被拒绝,我们就必须等待数组中前面的所有 Promise 都解决,然后 spiderLinks() 返回的 Promise 也会被拒绝。这在大多数情况下并不是最佳选择,因为我们通常希望尽快知道操作是否失败。幸运的是,我们已经有了一个内置函数,它的行为正是我们想要的,那就是 Promise.all()。事实上,只要输入数组中提供的任何承诺发生拒绝,Promise.all() 就会立即拒绝。因此,我们甚至可以在所有异步/等待代码中都使用该方法。此外,由于 Promise.all() 返回的只是另一个 Promise,我们只需在其上调用 await 即可获得多个异步操作的结果。下面的代码就是一个例子:
const results = await Promise.all(promises)
因此,总而言之,我们推荐的具有并行执行和 async/await 功能的 SpiderLinks() 函数的实现看起来与使用 Promise 的实现几乎相同。 唯一可见的区别是我们现在使用异步函数,它总是返回一个 Promise:
async function spiderLinks (currentUrl, content, nesting) {
if (nesting === 0) {
return
}
const links = getPageLinks(currentUrl, content)
const promises = links.map(link => spider(link, nesting - 1))
return Promise.all(promises)
}
我们刚刚了解的有关并行执行和 async/await 的知识只是重申了 async/await 与 Promise 密不可分的事实。 大多数与 Promise 配合使用的实用程序也可以与 async/await 无缝配合,我们应该毫不犹豫地在异步函数中利用它们。
限制并行执行
要使用 async/await 实现有限并行执行模式,我们可以简单地重用我们在 Promises 部分的有限并行执行小节中创建的 TaskQueue 类。 我们可以按原样使用它,也可以将其内部转换为 async/await。 将 TaskQueue 类转换为 async/await 是一个简单的操作,我们将其留给您作为练习。 无论哪种方式,TaskQueue 外部接口都不应该改变; 两种实现都会有一个 runTask() 方法,该方法返回一个 Promise,该 Promise 在队列运行任务时结算。
从这个假设开始,将网络蜘蛛 v4 从 Promise 转换为 async/await 也是一项微不足道的任务,我们不会在这里展示所有步骤,因为我们不会学习任何新东西。 相反,我们在本节中要做的是检查使用 async/await 和生产者消费者方法的 TaskQueue 类的第三种变体。
将这种方法应用于我们的问题的总体思路如下:
-
一方面,我们有一组未知的生产者将任务添加到队列中。
-
另一方面,我们有一组预定义的使用者,负责从队列中提取并执行任务,一次一个。
下图应该可以帮助我们理解设置:
/image-2024-05-06-23-09-46-625.png)
消费者的数量将决定任务执行的并发度。 这里的挑战是当队列为空时让消费者 “睡眠”,并在有新任务要运行时再次 “唤醒” 他们。 但我们很幸运,由于 Node.js 是单线程的,因此将任务置于 “睡眠” 状态只是意味着将控制权交还给事件循环,而“恢复”任务相当于调用回调。
考虑到这一点,让我们看一些代码。 我们将创建一个名为 TaskQueuePC 的新类,其公共接口类似于我们在本章前面实现的 TaskQueue 类之一。 采用自上而下的方法,让我们看看如何实现构造函数:
export class TaskQueuePC {
constructor (concurrency) {
this.taskQueue = []
this.consumerQueue = []
// spawn consumers
for (let i = 0; i < concurrency; i++) {
this.consumer()
}
}
// ...
首先,我们可以注意到现在有两个队列,一个用于存放任务(taskQueue),另一个用于存放休眠的消费者(consumerQueue)。这些队列将如何使用稍后会更清楚。在构造函数的第二部分,我们会根据想要达到的并发量生成相应数量的消费者。让我们看看消费者是什么样的:
async consumer () {
while (true) { // (1)
try {
const task = await this.getNextTask() // (2)
await task() // (3)
} catch (err) {
console.error(err) // (4)
}
}
}
首先,我们可以注意到我们现在有两个队列,一个用于保存任务(taskQueue),另一个用于存储休眠的消费者(consumerQueue)。 稍后将更清楚如何使用这些队列。 在构造函数的第二部分中,我们根据想要获得的并发性生成尽可能多的消费者。 让我们看看消费者是什么样子的:
从表面上看,TaskQueuePC 中的每个消费者似乎都是一个实际的线程。 事实上,我们的 consumer() 函数有一个无限循环,它可以 “暂停” 直到被其他 “线程” 唤醒。 实际上,我们不应该忘记每个消费者都是一个异步函数,这只不过是围绕承诺和回调构建的良好语法。 while 循环看似不断旋转,消耗 CPU 周期,但在底层,该循环比传统的 while 循环更类似于异步递归。 |
通过下一个代码片段,我们应该开始了解发生了什么。 我们看一下 getNextTask() 的实现:
async getNextTask () {
return new Promise((resolve) => {
if (this.taskQueue.length !== 0) {
return resolve(this.taskQueue.shift()) // (1)
}
this.consumerQueue.push(resolve) // (2)
})
}
getNextTask() 方法返回一个新的 Promise,如果队列不为空,则该 Promise 会解析队列中的第一个任务。 第一个任务从任务队列中删除并用作调用 resolve (1) 的参数。 如果队列为空,我们将通过将解析回调排队到消费者队列中来推迟 Promise 的解析。 这将有效地让 Promise 以及等待 Promise 的消费者进入睡眠状态。
现在是整个 TaskQueuePC 类的 “粘合” 部分,它对应于算法的生产者端。 这是在 runTask() 方法中实现的:
runTask (task) {
return new Promise((resolve, reject) => {
const taskWrapper = () => { // (1)
const taskPromise = task()
taskPromise.then(resolve, reject)
return taskPromise
}
if (this.consumerQueue.length !== 0) { // (2)
const consumer = this.consumerQueue.shift()
consumer(taskWrapper)
} else { // (3)
this.taskQueue.push(taskWrapper)
}
})
}
首先,我们创建一个 taskWrapper 函数(1),它在执行时负责运行输入任务并将 task() 返回的 Promise 的状态转发到 runTask() 返回的外部 Promise。 接下来,如果 consumerQueue 不为空(2),则意味着至少有一个消费者处于休眠状态,等待新任务运行。 然后,我们从队列中提取第一个消费者(记住,这本质上是 getNextTask() 返回的 Promise 的解析回调),并通过传递我们的 taskWrapper 立即调用它。 相反,如果所有消费者都已忙 (3),我们将 taskWrapper 推入任务队列。
我们的 TaskQueuePC 类的实现到此结束。 TaskQueuePC 类的公共接口与我们在 Promises 部分中实现的 TaskQueue 类的公共接口相同,因此将网络蜘蛛的代码迁移到新算法将是一项简单的任务。
这也结束了我们对 async/await 结构的探索。 但是,在结束本章之前,我们将深入研究一个影响 Promise 的微妙问题。