回调最佳实现和控制流模式

现在您已经遇到了回调地狱的第一个示例,您知道应该绝对避免什么; 然而,这并不是编写异步代码时唯一需要考虑的问题。 事实上,在某些情况下,控制一组异步任务的流程需要使用特定的模式和技术,特别是当我们仅使用纯 JavaScript 而无需任何外部库的帮助时。 例如,通过按顺序应用异步操作来迭代集合并不像在数组上调用 forEach() 那么容易; 它实际上需要类似于递归的技术。

在本节中,您不仅将了解如何避免回调地狱,还将了解如何仅使用简单明了的 JavaScript 来实现一些最常见的控制流模式。

回调规则

编写异步代码时,需要牢记的第一条规则是:定义回调函数时不要滥用原地函数定义。这样做固然诱人,因为它似乎不需要在模块化和可重用性等方面进行额外思考。但是,您已经看到过这种做法的弊大于利。

解决回调地狱问题通常不需要任何额外的库、花哨的技术或范式转换,只需要一些常识即可。以下是一些基本原则,可以帮助我们降低嵌套级别,并改善整体代码组织:

  • 尽快退出。根据上下文使用 return、continue 或 break 来立即退出当前语句,而不是编写(和嵌套)完整的 if…​else 语句。这将有助于使我们的代码保持较浅深度。

  • 为回调创建已命名的函数,使其远离闭包,并将中间结果作为参数传递。为函数命名还能让它们在堆栈跟踪中看起来更美观。

  • 将代码模块化。尽可能将代码拆分成更小的、可重复使用的函数。

现在,让我们将这些原则付诸实践。

应用回调规则

为了展示上一节中提到的想法的威力,让我们应用它们来修复网络蜘蛛应用程序中的回调地狱。

第一步,我们可以通过删除 else 语句来重构错误检查模式。这可以通过在收到错误后立即返回函数来实现。这样,代码就不会像下面这样了:

if (err) {
    cb(err)
} else {
    // code to execute when there are no errors
}

我们可以改写如下代码,从而改进代码的组织结构:

if (err) {
    return cb(err)
}
// code to execute when there are no errors

这通常被称为 提前返回原则。 通过这个简单的技巧,我们立即减少了函数的嵌套级别。 它很简单,不需要任何复杂的重构。

执行刚才描述的优化时的一个常见错误是忘记在调用回调后终止该函数。 对于错误处理场景,以下代码是典型的缺陷来源:

if (err) {
    callback(err)
}
// code to execute when there are no errors.

我们永远不应该忘记,即使在调用回调之后,函数的执行也会继续。 因此,插入返回指令以阻止函数其余部分的执行非常重要。 另外,请注意,函数返回什么值并不重要;重要的是函数返回什么值。 真正的结果(或错误)是异步生成的并传递给回调。 异步函数的返回值通常被忽略。 该属性允许我们编写如下所示的快捷方式:

return callback(...)

否则,我们就必须编写稍微冗长的代码,如下所示:

callback(...)
return

作为 Spider() 函数的第二次优化,我们可以尝试识别可重用的代码片段。 例如,将给定字符串写入文件的功能可以轻松分解为单独的函数,如下所示:

function saveFile (filename, contents, cb) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return cb(err)
        }
        fs.writeFile(filename, contents, cb)
    })
}

遵循相同的原则,我们可以创建一个名为 download() 的通用函数,它将 URL 和文件名作为输入,并将 URL 下载到给定文件中。 在内部,我们可以使用之前创建的 saveFile() 函数:

function download (url, filename, cb) {
    console.log(`Downloading ${url}`)
    superagent.get(url).end((err, res) => {
        if (err) {
            return cb(err)
        }
        saveFile(filename, res.text, err => {
            if (err) {
                return cb(err)
            }
            console.log(`Downloaded and saved: ${url}`)
            cb(null, res.text)
        })
    })
}

对于最后一步,我们修改了 Spider() 函数,由于我们的更改,该函数现在如下所示:

export function spider (url, cb) {
    const filename = urlToFilename(url)
    fs.access(filename, err => {
        if (!err || err.code !== 'ENOENT') { // (1)
            return cb(null, filename, false)
        }
        download(url, filename, err => {
            if (err) {
                return cb(err)
            }
            cb(null, filename, true)
        })
    })
}

Spider() 函数的功能和界面保持完全相同; 改变的是代码的组织方式。 需要注意的一个重要细节 (1) 是我们反转了对文件是否存在的检查,以便我们可以应用前面讨论的提前返回原则。

通过应用早期返回原则和其他回调规则原则,我们能够大大减少代码的嵌套,同时提高其可重用性和可测试性。 事实上,我们可以考虑导出 saveFile() 和 download() ,以便我们可以在其他模块中重用它们。 这也将使我们能够作为独立单元测试它们的功能。

我们在本节中进行的重构清楚地表明,大多数时候,我们需要的只是一些纪律,以确保我们不会滥用闭包和匿名函数。 它工作得很好,只需要很少的努力,而且不需要外部库。

现在您已经知道如何使用回调编写干净的异步代码,我们准备探索一些最常见的异步模式,例如顺序执行和并行执行。

顺序执行

在本节中,我们将研究异步控制流模式,并从分析顺序执行流开始。

按顺序执行一组任务意味着一次一个接一个地运行它们。 执行顺序很重要并且必须保留,因为列表中任务的结果可能会影响下一个任务的执行。 图 4.1 说明了这个概念:

image 2024 05 06 15 37 12 205
Figure 1. 图 4.1:三个任务的顺序执行流程示例

此流程有不同的变体:

  • 按顺序执行一组已知任务,而不在它们之间传播数据。

  • 使用任务的输出作为下一个任务的输入(也称为链、管道或瀑布)。

  • 迭代集合,同时对每个元素逐一运行异步任务。

顺序执行尽管在使用直接式阻塞 API 实现时很简单,但通常是使用异步 CPS 时回调地狱问题的主要原因。

按顺序执行一组已知的任务

我们在上一节中实现 Spider() 函数时已经了解了顺序执行流程。 通过应用一些简单的规则,我们能够在顺序执行流中组织一组已知任务。 以该代码为指导,我们现在可以使用以下模式概括该解决方案:

function task1 (cb) {
    asyncOperation(() => {
        task2(cb)
    })
}

function task2 (cb) {
    asyncOperation(() => {
        task3(cb)
    })
}

function task3 (cb) {
    asyncOperation(() => {
        cb() // finally executes the callback
    })
}

task1(() => {
// executed when task1, task2 and task3 are completed
    console.log('tasks 1, 2 and 3 executed')
})

前面的模式显示了每个任务如何在完成通用异步操作后调用下一个任务。 该模式强调任务的模块化,展示了处理异步代码并不总是需要闭包。

顺序迭代

如果我们事先知道要执行哪些任务和多少个任务,上一节中描述的模式就能完美运行。这样我们就可以对序列中下一个任务的调用进行硬编码,但如果我们想对集合中的每个项目执行异步操作,会发生什么情况呢?在这种情况下,我们就不能再对任务序列进行硬编码,而必须动态地构建它。

网络蜘蛛版本2

为了展示顺序迭代的示例,让我们向网络蜘蛛应用程序引入一个新功能。 我们现在想要递归下载网页中包含的所有链接。 为此,我们将从页面中提取所有链接,然后递归地按顺序触发每个链接的网络蜘蛛。

第一步是修改我们的 Spider() 函数,以便它使用我们将很快创建的名为 SpiderLinks() 的函数触发页面所有链接的递归下载。

此外,我们不会检查文件是否已经存在,而是尝试读取它并开始抓取其链接。 这样,我们就能够恢复中断的下载。 作为最后的更改,我们需要确保传播一个新参数,即嵌套,这将帮助我们限制递归深度。 代码如下:

export function spider (url, nesting, cb) {
    const filename = urlToFilename(url)
    fs.readFile(filename, 'utf8', (err, fileContent) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return cb(err)
            }

            // The file doesn't exist, so let's download it
            return download(url, filename, (err, requestContent) => {
                if (err) {
                    return cb(err)
                }
                spiderLinks(url, requestContent, nesting, cb)
            })
        }

        // The file already exists, let's process the links
        spiderLinks(url, fileContent, nesting, cb)
    })
}

在下一节中,我们将探讨如何实现 SpiderLinks() 函数。

按顺序抓取链接

现在,我们可以创建新版本网络蜘蛛应用程序的核心,spiderLinks() 函数,它使用顺序异步迭代算法下载 HTML 页面的所有链接。 请注意我们将在以下代码块中定义它的方式:

function spiderLinks (currentUrl, body, nesting, cb) {
    if (nesting === 0) {
        // Remember Zalgo from chapter 3?
        return process.nextTick(cb)
    }

    const links = getPageLinks(currentUrl, body) // (1)
    if (links.length === 0) {
        return process.nextTick(cb)
    }

    function iterate (index) { // (2)
        if (index === links.length) {
            return cb()
        }

        spider(links[index], nesting - 1, function (err) { // (3)
            if (err) {
                return cb(err)
            }
            iterate(index + 1)
        })
    }

    iterate(0) // (4)
}

理解这个新函数的重要步骤如下:

  1. 我们使用 getPageLinks() 函数获取页面中包含的所有链接的列表。 此函数仅返回指向内部目标(相同主机名)的链接。

  2. 我们使用名为 iterate() 的本地函数迭代链接,该函数获取下一个链接的索引进行分析。 在此函数中,我们要做的第一件事是检查索引是否等于 links 数组的长度,在这种情况下,我们立即调用 cb() 函数,因为这意味着我们已经处理了所有项目。

  3. 此时,处理链接的一切都应该准备就绪。 我们通过减少嵌套级别来调用 Spider() 函数,并在操作完成时调用迭代的下一步。

  4. 作为 spiderLinks() 函数的最后一步,我们通过调用 iterate(0) 来启动迭代。

刚刚介绍的算法允许我们通过按顺序执行异步操作来迭代数组,在我们的例子中是 Spider() 函数。

最后,我们可以稍微更改一下 Spider-cli.js,以便可以将嵌套级别指定为附加命令行界面 (CLI) 参数:

import { spider } from './spider.js'

const url = process.argv[2]
const nesting = Number.parseInt(process.argv[3], 10) || 1

spider(url, nesting, err => {
    if (err) {
        console.error(err)
        process.exit(1)
    }

    console.log('Download complete')
})

我们现在可以尝试这个新版本的蜘蛛应用程序,并观察它一个接一个地递归下载网页的所有链接。 要中断该过程(如果有很多链接,这可能需要一段时间),请记住我们始终可以使用 Ctrl + C。如果我们随后决定恢复它,我们可以通过启动蜘蛛应用程序并提供我们使用的相同 URL 来实现 第一次运行。

既然我们的网络蜘蛛程序有可能触发整个网站的下载,请谨慎使用。例如,不要设置过高的嵌套级别,也不要让蜘蛛运行超过几秒钟。让服务器承受成千上万的请求是不礼貌的。在某些情况下,这还可能被视为违法行为。负责任地使用蜘蛛!

模式

上一节中的 SpiderLinks() 函数的代码清楚地说明了如何在应用异步操作时迭代集合。 您可能还注意到,这种模式可以适用于我们需要异步迭代集合元素或通常任务列表的任何其他情况。 这种模式可以概括如下:

function iterate (index) {
    if (index === tasks.length) {
        return finish()
    }
    const task = tasks[index]
    task(() => iterate(index + 1))
}

function finish () {
    // iteration completed
}

iterate(0)

值得注意的是,如果 task() 是同步操作,这些类型的算法就会变得真正递归。在这种情况下,堆栈不会在每个周期都展开,并且可能存在达到最大调用堆栈大小限制的风险。

刚刚介绍的模式非常强大,可以扩展或调整以满足多种常见需求。 仅举一些例子:

  • 我们可以异步映射数组的值。

  • 我们可以将一个操作的结果传递到迭代中的下一个操作,以实现异步版本的 reduce 算法。

  • 如果满足特定条件(Array.some() 帮助器的异步实现),我们可以提前退出循环。

  • 我们甚至可以迭代无限多个元素。

我们还可以选择通过将解决方案包装在具有如下签名的函数中来进一步概括该解决方案:

iterateSeries(collection, iteratorCallback, finalCallback)

这里,collection 是你想要迭代的实际数据集,iteratorCallback 是对每个项目执行的函数,finalCallback 是在处理所有项目或出现错误时执行的函数。 这个辅助函数的实现留给您作为练习。

顺序迭代器模式

通过创建一个名为 iterator 的函数来按顺序执行任务列表,该函数调用集合中的下一个可用任务,并确保在当前任务完成时调用迭代的下一步。

在下一节中,我们将探讨并行执行模式,当各个任务的顺序不重要时,这种模式会更方便。

并行执行

在某些情况下,一组异步任务的执行顺序并不重要,我们想要的只是在所有这些正在运行的任务完成时收到通知。 使用并行执行流程可以更好地处理这种情况,如图 4.2 所示:

image 2024 05 06 16 01 44 904
Figure 2. 图 4.2:三个任务并行执行的示例

如果您认为 Node.js 是单线程的,这可能听起来很奇怪,但如果您还记得我们在第 1 章 Node.js 平台中讨论的内容,您就会意识到,即使我们只有一个线程,我们仍然可以实现 并发性,得益于 Node.js 的非阻塞特性。 事实上,在这种情况下,“并行”一词使用不当,因为它并不意味着任务同时运行,而是意味着它们的执行是由底层的非阻塞 API 执行并由事件循环交错执行。

如您所知,当任务请求新的异步操作时,它会将控制权交还给事件循环,从而允许事件循环执行另一个任务。 描述这种流程的正确词是并发,但为了简单起见,我们仍将使用并行。

下图展示了两个异步任务如何在 Node.js 程序中并行运行:

image 2024 05 06 16 04 05 103
Figure 3. 图 4.3:异步任务如何并行运行的示例

在图 4.3 中,我们有一个执行两个异步任务的 Main 函数:

  1. Main 函数触发任务 1 和任务 2 的执行。当它们触发异步操作时,它们立即将控制权返回给 Main 函数,然后 Main 函数返回它到事件循环。

  2. 当任务 1 的异步操作完成后,事件循环将控制权交给它。当任务 1 也完成其内部同步处理时,它会通知 Main 函数。

  3. 当任务 2 触发的异步操作完成时,事件循环调用其回调,将控制权交还给任务 2。在任务 2 结束时,再次通知 Main 函数。 此时,Main 函数知道任务 1 和任务 2 都已完成,因此它可以继续执行或将操作结果返回给另一个回调。

简而言之,这意味着在 Node.js 中,我们只能并行执行异步操作,因为它们的并发性是由非阻塞 API 在内部处理的。 在 Node.js 中,同步(阻塞)操作不能同时运行,除非它们的执行与异步操作交错,或者与 setTimeout() 或 setImmediate() 交错。 您将在第 11 章 “高级食谱” 中更详细地看到这一点。

网络蜘蛛版本3

我们的网络蜘蛛应用程序似乎是应用并行执行概念的完美候选者。 到目前为止,我们的应用程序正在以顺序方式执行链接页面的递归下载。 我们可以通过并行下载所有链接的页面来轻松提高此过程的性能。

为此,我们只需要修改 spiderLinks() 函数以确保我们立即生成所有 spider() 任务,然后仅当所有任务都完成执行时才调用最终回调。 因此,让我们修改 SpiderLinks() 函数,如下所示:

function spiderLinks (currentUrl, body, nesting, cb) {
    if (nesting === 0) {
        return process.nextTick(cb)
    }

    const links = getPageLinks(currentUrl, body)
    if (links.length === 0) {
        return process.nextTick(cb)
    }

    let completed = 0
    let hasErrors = false

    function done (err) {
        if (err) {
            hasErrors = true
            return cb(err)
        }
        if (++completed === links.length && !hasErrors) {
            return cb()
        }
    }

    links.forEach(link => spider(link, nesting - 1, done))
}

让我们讨论一下我们改变了什么。 如前所述,spider() 任务现在一次性启动。 这可以通过简单地迭代 links 数组并启动每个任务而无需等待前一个任务完成来实现:

links.forEach(link => spider(link, nesting - 1, done))

然后,让我们的应用程序等待所有任务完成的技巧是为 spider() 函数提供一个特殊的回调,我们称之为 done()。 当蜘蛛任务完成时,done() 函数会增加一个计数器。 当完成的下载数量达到 links 数组的大小时,调用最终回调:

function done (err) {
    if (err) {
        hasErrors = true
        return cb(err)
    }
    if (++completed === links.length && !hasErrors) {
        return cb()
    }
}

hasErrors 变量是必要的,因为如果一个并行任务失败,我们希望立即调用带有给定错误的回调。 此外,我们需要确保可能仍在运行的其他并行任务不会再次调用回调。

完成这些更改后,如果我们现在尝试在网页上运行蜘蛛程序,我们会注意到整个过程的速度有了巨大的提高,因为每次下载都将并行执行,而无需等待上一个链接被处理。

模式

最后,我们可以为并行执行流程提取漂亮的小模式。 让我们用以下代码表示该模式的通用版本:

const tasks = [ /* ... */ ]

let completed = 0
tasks.forEach(task => {
    task(() => {
        if (++completed === tasks.length) {
            finish()
        }
    })
})

function finish () {
    // all the tasks completed
}

通过小的修改,我们可以调整该模式,将每个任务的结果累积到一个集合中,过滤或映射数组的元素,或者在一个或给定数量的任务完成后立即调用 finish() 回调( 最后一种情况被称为竞争性竞赛)。

无限并行执行模式

通过一次启动所有异步任务来并行运行一组异步任务,然后通过计算调用回调的次数来等待所有任务完成。

当多个任务并行运行时,可能会出现竞争条件,即争用访问外部资源(例如数据库中的文件或记录)。 在下一节中,我们将讨论 Node.js 中的竞争条件并探索一些识别和解决它们的技术。

修复并发任务的竞争条件

将阻塞 I/O 与多个线程结合使用时,并行运行一组任务可能会导致问题。 然而,您刚刚已经看到,在 Node.js 中,这是一个完全不同的故事。 事实上,并行运行多个异步任务在资源方面是简单且廉价的。

这是 Node.js 最重要的优势之一,因为它使并行化成为一种常见的做法,而不是一种仅在绝对必要时使用的复杂技术。

Node.js 并发模型的另一个重要特征是我们处理任务同步和竞争条件的方式。 在多线程编程中,这通常是使用锁、互斥体、信号量和监视器等结构来完成的,它可能是并行化最复杂的方面之一,并且对性能有相当大的影响。 在 Node.js 中,我们通常不需要花哨的同步机制,因为一切都在单个线程上运行。 然而,这并不意味着我们不能有竞争条件; 相反,它们可能相当常见。 问题的根源在于异步操作的调用和其结果的通知之间的延迟。

为了看一个具体的例子,我们将再次参考我们的网络蜘蛛应用程序,特别是我们创建的最后一个版本,它实际上包含一个竞争条件(你能发现它吗?)。我们讨论的问题在于 spider() 函数,在开始下载相应的 URL 之前,我们会检查文件是否已经存在:

export function spider (url, nesting, cb) {
    const filename = urlToFilename(url)
    fs.readFile(filename, 'utf8', (err, fileContent) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return cb(err)
            }
            return download(url, filename, (err, requestContent) => {
            // ...

问题在于,在同一 URL 上运行的两个蜘蛛任务可能会调用 fs。在两个任务之一完成下载并创建文件之前对同一文件调用 readFile(),导致两个任务开始下载。图4.4 解释了这种情况:

image 2024 05 06 16 52 17 717
Figure 4. 图 4.4:spider() 函数中的竞争条件示例

图 4.4 显示了任务 1 和任务 2 如何在 Node.js 的单线程中交错,以及异步操作如何实际上引入竞争条件。在我们的例子中,两个蜘蛛任务最终下载相同的文件。

我们该如何解决这个问题?答案比您想象的要简单得多。事实上,我们所需要的只是一个变量来相互排除在同一 URL 上运行的多个 Spider() 任务。 这可以通过一些代码来实现,例如:

const spidering = new Set()
function spider (url, nesting, cb) {
    if (spidering.has(url)) {
        return process.nextTick(cb)
    }
    spidering.add(url)
    // ...

该修复不需要太多注释。 如果给定的 url 已经存在于抓取集中,我们只需立即退出该函数即可; 否则,我们将 url 添加到集合中并继续下载。 在我们的例子中,我们不需要释放锁,因为我们对下载一个 URL 两次不感兴趣,即使蜘蛛任务是在两个完全不同的时间点执行的。 如果您正在构建一个可能需要下载数十万个网页的蜘蛛,那么在下载文件后从集合中删除下载的 url 将帮助您保持设置的基数,从而避免内存消耗无限增长。

即使我们处于单线程环境中,竞争条件也会导致许多问题。 在某些情况下,它们可能会导致数据损坏,并且由于其短暂性,通常很难调试。 因此,在并行运行任务时仔细检查这些类型的情况始终是一个好习惯。

此外,运行任意数量的并行任务可能是一种危险的做法。 在下一节中,您将了解为什么它会成为问题以及如何控制并行任务的数量。

有限的并行执行

在没有控制的情况下生成并行任务通常会导致负载过大。 想象一下要读取数千个文件、访问 URL 或并行运行数据库查询。 这种情况下的一个常见问题是资源耗尽。 最常见的示例是应用程序尝试一次打开太多文件,利用进程可用的所有文件描述符。

生成无限并行任务来处理用户请求的服务器可能会遭到拒绝服务 (DoS) 攻击。 即恶意行为者可以伪造一个或多个请求来促使服务器消耗所有可用资源并变得无响应。 一般来说,限制并行任务的数量是一种有助于构建弹性应用程序的良好实践。

我们的网络蜘蛛版本 3 不限制并行任务的数量,因此,它在许多情况下很容易崩溃。 例如,如果我们尝试在一个非常大的网站上运行它,我们可能会看到它运行了几秒钟,然后失败并显示错误代码 ECONNREFUSED。 当我们从 Web 服务器同时下载太多页面时,服务器可能会决定开始拒绝来自同一 IP 的新连接。 在这种情况下,我们的蜘蛛只会崩溃,如果我们想继续爬行网站,我们将被迫重新启动该进程。 我们可以处理 ECONNREFUSED 来阻止进程崩溃,但我们仍然会面临分配太多并行任务的风险,并且可能会遇到其他问题。

在本节中,您将看到我们如何通过限制并发来使我们的蜘蛛更具弹性。

下图显示了一种情况,其中有五个并行运行的任务,并发限制为两个:

image 2024 05 06 17 06 02 196
Figure 5. 图 4.5:如何将并发限制为最多两个并行任务的示例

从图 4.5 中,我们应该清楚我们的算法是如何工作的:

  1. 最初,我们在不超过并发限制的情况下生成尽可能多的任务。

  2. 然后,每次任务完成时,我们都会生成一个或多个任务,直到再次达到限制。

在下一节中,我们将探讨有限并行执行模式的可能实现。

限制并发数

现在我们将研究一种以有限并发性并行执行一组给定任务的模式:

const tasks = [
    // ...
]

const concurrency = 2
let running = 0
let completed = 0
let index = 0

function next () { // (1)
    while (running < concurrency && index < tasks.length) {
        const task = tasks[index++]
        task(() => { // (2)
            if (++completed === tasks.length) {
                return finish()
            }
            running--
            next()
        })
        running++
    }
}
next()

function finish() {
    // all tasks finished
}

该算法可以被认为是顺序执行和并行执行的混合。 事实上,您可能会注意到这两种模式的相似之处:

  1. 我们有一个迭代器函数,我们将其称为 next(),然后是一个内部循环,该循环在保持并发限制的情况下并行生成尽可能多的任务。

  2. 下一个重要部分是我们传递给每个任务的回调,它检查我们是否完成了列表中的所有任务。 如果仍有任务要运行,它会调用 next() 来生成另一组任务。

很简单,不是吗?

全局限制并发数

我们的网络蜘蛛应用程序非常适合应用我们刚刚学到的有关限制一组任务的并发性的知识。 事实上,为了避免同时抓取数千个链接的情况,我们可以通过添加有关并发下载数量的一些可预测性来强制限制此过程的并发性。

我们可以将有限并发模式的这种实现应用于 SpiderLinks() 函数,但通过这样做,我们只会限制从给定页面中找到的链接生成的任务的并发性。 例如,如果我们选择两个并发,则每个页面最多会并行下载两个链接。 然而,由于我们可以一次下载多个链接,因此每个页面都会产生另外两次下载,从而导致下载操作总量呈指数级增长。

一般来说,当我们有一组预定的任务要执行,或者当任务组随时间线性增长时,这种有限并发模式的实现效果非常好。 相反,当一个任务可以直接生成两个或多个任务时(就像我们的网络蜘蛛一样),此实现不适合限制全局并发。

Queues to the rescue

那么,我们真正想要的是限制我们可以并行运行的下载操作的全局数量。 我们可以稍微修改上一节中显示的模式,但这留给您作为练习。 相反,让我们讨论另一种利用队列来限制多个任务并发的机制。 让我们看看这是如何工作的。

我们现在将实现一个名为 TaskQueue 的简单类,它将队列与讨论有限并发时提出的算法结合起来。 让我们创建一个名为 taskQueue.js 的新模块:

export class TaskQueue {
    constructor (concurrency) {
        this.concurrency = concurrency
        this.running = 0
        this.queue = []
    }

    pushTask (task) {
        this.queue.push(task)
        process.nextTick(this.next.bind(this))
        return this
    }

    next () {
        while (this.running < this.concurrency && this.queue.length) {
            const task = this.queue.shift()
            task(() => {
                this.running--
                process.nextTick(this.next.bind(this))
            })
            this.running++
        }
    }
}

该类的构造函数只接受并发限制作为输入,除此之外,它还初始化了实例变量 running 和 queue。前一个变量是一个计数器,用于跟踪所有正在运行的任务,而后一个变量是一个数组,将用作队列来存储待处理的任务。

pushTask() 方法只是将一个新任务添加到队列中,然后通过异步调用 this.next() 来引导 Worker 的执行。请注意,我们必须使用绑定,否则当 process.nextTick 调用 next 函数时,它将丢失上下文。

next() 方法会从队列中生成一组任务,确保不会超过并发限制。

您可能会注意到,这种方法与 “限制并发量” 部分开头介绍的模式有一些相似之处。它主要是在不超过并发限制的情况下,从队列中启动尽可能多的任务。每个任务完成后,它会更新正在运行的任务数,然后再次异步调用 next() 开始新一轮任务。TaskQueue 类的有趣之处在于,它允许我们动态地向队列中添加新项目。另一个优点是,现在我们有了一个中心实体,负责限制任务的并发性,它可以在函数执行的所有实例中共享。在我们的例子中,它就是 spider() 函数,稍后您就会看到。

完善任务队列

TaskQueue 的先前实现足以演示队列模式,但为了在实际项目中使用,它需要一些额外的功能。 例如,我们如何判断其中一项任务失败了? 我们如何知道队列中的所有工作是否都已完成?

让我们回顾一下我们在第 3 章回调和事件中讨论的一些概念,然后将 TaskQueue 转换为 EventEmitter,以便我们可以发出事件来传播任务失败并在队列为空时通知任何观察者。

我们要做的第一个更改是导入 EventEmitter 类并让我们的 TaskQueue 扩展它:

import { EventEmitter } from 'events'

export class TaskQueue extends EventEmitter {
    constructor (concurrency) {
        super()
        // ...
    }
    // ...
}

此时,我们可以使用 this.emit 从 TaskQueue next() 方法中触发事件:

next () {
    if (this.running === 0 && this.queue.length === 0) { // (1)
        return this.emit('empty')
    }
    while (this.running < this.concurrency && this.queue.length) {
        const task = this.queue.shift()
        task((err) => { // (2)
            if (err) {
                this.emit('error', err)
            }
            this.running--
            process.nextTick(this.next.bind(this))
        })
        this.running++
    }
}

与之前的实现相比,这里增加了两处:

  • 每次调用 next() 函数时,我们都会检查是否没有任务正在运行以及队列是否为空。 在这种情况下,这意味着队列已被清空,我们可以触发空事件。

  • 现在可以通过传递错误来调用每个任务的完成回调。 我们检查是否确实传递了错误,表明任务已失败,在这种情况下,我们使用错误事件传播此类错误。

请注意,如果出现错误,我们会故意保持队列运行。 我们不会停止其他正在进行的任务,也不会删除任何待处理的任务。 这在基于队列的系统中很常见。 错误是不可避免会发生的,通常最好是识别错误并考虑重试或恢复策略,而不是让系统在这些情况下崩溃。 我们将在第 13 章 “消息传递和集成模式” 中详细讨论这些概念。

网络蜘蛛版本4

现在我们有了通用队列,可以在有限的并行流程中执行任务,让我们直接用它来重构网络蜘蛛应用程序。

我们将使用 TaskQueue 的一个实例作为工作积压;我们要抓取的每个 URL 都需要作为一个任务添加到队列中。起始 URL 将作为第一个任务添加,然后在抓取过程中发现的每个其他 URL 也将被添加。队列将为我们管理所有的调度工作,确保任何时候正在进行的任务数量(即从文件系统下载或读取的页面数量)都不会超过为给定 TaskQueue 实例配置的并发限制。

我们已经在 spider() 函数中定义了抓取给定 URL 的逻辑。我们可以将其视为通用抓取任务。为了更加清晰,最好将此函数重命名为 spiderTask:

function spiderTask (url, nesting, queue, cb) { // (1)
    const filename = urlToFilename(url)
    fs.readFile(filename, 'utf8', (err, fileContent) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return cb(err)
            }

            return download(url, filename, (err, requestContent) => {
                if (err) {
                    return cb(err)
                }

                spiderLinks(url, requestContent, nesting, queue) // (2)
                return cb()
            })
        }

        spiderLinks(url, fileContent, nesting, queue) // (3)
        return cb()
    })
}

除了重新命名函数外,您可能还注意到我们还做了其他一些小改动:

  • 函数签名现在接受一个名为 queue 的新参数。这是一个 TaskQueue 的实例,我们需要将其继承过来,以便在必要时添加新任务。

  • 负责为抓取添加新链接的函数是 spiderLinks,因此我们需要确保在下载新页面后调用该函数时传递队列实例。

  • 当我们从一个已下载的文件中调用 spiderLinks 时,也需要将队列实例传递给它。

让我们再来看看 spiderLinks() 函数。这个函数现在可以大大简化,因为它不再需要跟踪任务完成情况,因为这项工作已经委托给了队列。现在,它的工作变成了有效的同步工作;它只需调用新的 spider() 函数(我们很快就会定义)向队列推送一个新任务,每发现一个链接就推送一个新任务:

function spiderLinks (currentUrl, body, nesting, queue) {
    if (nesting === 0) {
        return
    }

    const links = getPageLinks(currentUrl, body)
    if (links.length === 0) {
        return
    }

    links.forEach(link => spider(link, nesting - 1, queue))
}

现在让我们重新回顾一下 Spider() 函数,它需要充当第一个 URL 的入口点; 它还将用于将每个新发现的 URL 添加到队列中:

const spidering = new Set() // (1)
export function spider (url, nesting, queue) {
    if (spidering.has(url)) {
        return
    }

    spidering.add(url)
    queue.pushTask((done) => { // (2)
        spiderTask(url, nesting, queue, done)
    })
}

正如您所看到的,该函数现在有两个主要职责:

  1. 它通过使用爬取集来管理已访问或正在进行的 URL 的簿记。

  2. 将新任务推送到队列中。 一旦执行,该任务将调用 spiderTask() 函数,有效地开始对给定 URL 的爬行。

最后,我们可以更新 Spider-cli.js 脚本,它允许我们从命令行调用我们的蜘蛛:

import { spider } from './spider.js'
import { TaskQueue } from './TaskQueue.js'

const url = process.argv[2] // (1)
const nesting = Number.parseInt(process.argv[3], 10) || 1
const concurrency = Number.parseInt(process.argv[4], 10) || 2

const spiderQueue = new TaskQueue(concurrency) // (2)
spiderQueue.on('error', console.error)
spiderQueue.on('empty', () => console.log('Download complete'))

spider(url, nesting, spiderQueue) // (3)

该脚本现在由三个主要部分组成:

  1. CLI 参数解析。 请注意,该脚本现在接受第三个附加参数,可用于自定义并发级别。

  2. 创建一个 TaskQueue 对象,并将侦听器附加到错误和空事件。 当发生错误时,我们只想打印它。 当队列为空时,这意味着我们已经完成对网站的抓取。

  3. 最后,我们通过调用 spider 函数来开始爬行过程。

应用这些更改后,我们可以尝试再次运行蜘蛛模块。 当我们运行以下命令时:

node spider-cli.js https://loige.co 1 4

我们应该注意到,同时激活的下载不会超过四个。

通过最后一个示例,我们结束了对基于回调的模式的探索。 在下一节中,我们将通过查看一个著名的库来结束本章,该库提供了这些模式和许多其他异步实用程序的生产就绪实现。