队列

介绍

在构建 web 应用程序时,有些任务(例如解析和存储上传的 CSV 文件)可能需要花费很长时间才能完成,这样在常规的 web 请求中执行可能会造成性能问题。幸运的是,Laravel 允许你轻松创建队列任务,这些任务可以在后台处理。通过将耗时的任务移到队列中,你的应用程序能够快速响应 web 请求,提供更好的用户体验。

Laravel 队列为不同的队列后端(如 Amazon SQSRedis,甚至是关系型数据库)提供了统一的队列 API。

Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在这个文件中,你将找到与框架中包含的各个队列驱动程序相关的连接配置,包括数据库驱动、 Amazon SQSRedisBeanstalkd 驱动,以及一个会立即执行任务的同步驱动(用于本地开发)。此外,还有一个 null 队列驱动,它会丢弃队列中的任务。

Laravel 现在提供了 Horizon,这是一个美丽的仪表盘和配置系统,专门为 Redis 驱动的队列设计。欲了解更多信息,请查看完整的 【Horizon 文档】。

连接与队列

在开始使用 Laravel 队列之前,理解 “连接” 和 “队列” 之间的区别非常重要。在你的 config/queue.php 配置文件中,有一个 connections 配置数组。这个选项定义了与后台队列服务(如 Amazon SQS、Beanstalk 或 Redis)连接的配置。然而,任何给定的队列连接可能会有多个 “队列”,这些队列可以理解为不同的任务堆栈或堆积。

请注意,在队列配置文件中的每个连接配置示例都包含一个 queue 属性。这个属性定义了任务被派发时,默认发送到哪个队列。换句话说,如果你派发任务时没有明确指定应该发送到哪个队列,任务将会被放置到连接配置中定义的默认队列:

use App\Jobs\ProcessPodcast;

// 这个任务会被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 这个任务会被发送到默认连接的 "emails" 队列...
ProcessPodcast::dispatch()->onQueue('emails');

有些应用程序可能不需要将任务推送到多个队列,而是倾向于使用一个简单的队列。然而,将任务推送到多个队列对于希望优先处理或对任务处理进行分段的应用程序特别有用,因为 Laravel 队列工作器允许你指定应该按优先级处理哪些队列。例如,如果你将任务推送到一个高优先级队列,你可以运行一个工作进程,赋予该队列更高的处理优先级:

php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

为了使用数据库队列驱动,你需要一个数据库表来存储任务。通常,Laravel 的默认迁移文件 0001_01_01_000002_create_jobs_table.php 包含了这个表的创建;如果你的应用程序中没有这个迁移文件,你可以使用 make:queue-table Artisan 命令来创建它:

php artisan make:queue-table

php artisan migrate

Redis

为了使用 Redis 队列驱动,你需要在 config/database.php 配置文件中配置一个 Redis 数据库连接。

Redis 队列驱动不支持 serializercompression 选项。

Redis 集群

如果你的 Redis 队列连接使用 Redis 集群,你的队列名称必须包含一个键【哈希标签】。这是必需的,以确保给定队列的所有 Redis 键都被放置在相同的哈希槽中:

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

阻塞模式

当使用 Redis 队列时,你可以使用 block_for 配置选项来指定驱动程序应该等待多长时间,直到任务可用,然后才会进入工作循环并重新轮询 Redis 数据库。

根据队列负载调整此值,比起不断轮询 Redis 数据库查找新任务,可能会更加高效。例如,你可以将值设置为 5,表示驱动程序应在等待任务可用时阻塞 5 秒:

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

block_for 设置为 0 会导致队列工作者在任务可用之前无限期阻塞。这样会阻止如 SIGTERM 这样的信号在下一个任务处理之前被处理。

其它驱动程序的先决条件

以下是一些队列驱动所需的依赖项。这些依赖项可以通过 Composer 包管理器进行安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0

  • Beanstalkd: pda/pheanstalk ~5.0

  • Redis: predis/predis ~2.0phpredis PHP 扩展

  • MongoDB: mongodb/laravel-mongodb

创建任务

生成任务类

默认情况下,你的应用程序中的所有可排队任务都会存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,在你运行 make:job Artisan 命令时会自动创建该目录:

php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,告诉 Laravel 该任务应该被推送到队列中以异步执行。

你可以通过【发布存根】来定制任务的存根文件。

类结构

作业类非常简单,通常只包含一个 handle 方法,当作业由队列处理时,该方法会被调用。为了更好地理解,让我们看一个示例作业类。在这个示例中,我们假设我们管理一个播客发布服务,需要在发布之前处理上传的播客文件:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }
}

在这个示例中,注意我们能够直接将 【Eloquent 模型】传递给排队作业的构造函数。由于作业使用了 Queueable 特性,Eloquent 模型及其已加载的关系会在处理作业时被优雅地序列化和反序列化。

如果你的排队作业接受 Eloquent 模型作为构造函数的参数,只有模型的标识符会被序列化到队列中。当作业被处理时,队列系统会自动从数据库重新获取完整的模型实例及其已加载的关系。通过这种方式,模型的序列化允许队列载荷更小。

handle 方法的依赖注入

handle 方法是在作业通过队列处理时被调用的。注意,我们能够在作业的 handle 方法上进行类型提示,Laravel 服务容器会自动注入这些依赖项。

如果你希望完全控制容器如何注入依赖项到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调内,你可以自由地调用 handle 方法。通常,你应该在 App\Providers\AppServiceProvider 服务提供者的 boot 方法中调用此方法:

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

像原始图像内容等二进制数据,在传递给排队作业之前应该先通过 base64_encode 函数进行编码,否则在放置到队列时作业可能无法正确序列化为 JSON。

排队关系

由于所有加载的 Eloquent 模型关系也会在作业排队时被序列化,因此序列化的作业字符串有时可能会变得非常大。此外,当作业被反序列化并且模型关系从数据库中重新获取时,它们会被完全获取。反序列化时,任何之前应用到模型的关系约束将不会再应用。因此,如果你希望只操作给定关系的子集,你应该在排队作业中重新约束该关系。

或者,为了防止关系被序列化,你可以在设置属性值时调用模型的 withoutRelations 方法。这个方法会返回一个不包含已加载关系的模型实例:

/**
 * 创建一个新的作业实例。
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果你使用 PHP 的构造函数属性提升,并希望指示一个 Eloquent 模型不应序列化其关系,可以使用 WithoutRelations 属性:

use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建一个新的作业实例。
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

如果一个作业接收的是 Eloquent 模型的集合或数组,而不是单个模型,则在作业被反序列化并执行时,集合中的模型将不会恢复其关系。这是为了防止在处理大量模型的作业中占用过多的资源。

唯一任务

唯一的任务需要一个支持【锁】的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动都支持原子锁。此外,唯一任务的约束不适用于批次中的任务。

有时,您可能希望确保在任何时间点,队列中只有一个特定任务的实例。您可以通过在任务类上实现 ShouldBeUnique 接口来实现这一点。此接口不需要您在类中定义任何额外的方法:

<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...
}

在上面的示例中,UpdateSearchIndex 任务是唯一的。因此,如果队列中已有该任务的实例并且尚未完成处理,则不会调度该任务。

在某些情况下,您可能希望定义一个特定的 “键” 来使任务唯一,或者希望指定一个超时值,超时后任务不再保持唯一。为此,您可以在任务类中定义 uniqueIduniqueFor 属性或方法:

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * The product instance.
     *
     * @var \App\Product
     */
    public $product;

    /**
     * The number of seconds after which the job's unique lock will be released.
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * Get the unique ID for the job.
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 任务通过产品 ID 来唯一化。因此,任何带有相同产品 ID 的任务调度都将被忽略,直到现有任务完成处理。此外,如果现有任务未在一小时内完成处理,则唯一锁将被释放,且可以调度一个具有相同唯一键的新任务。

如果您的应用程序从多个 Web 服务器或容器调度任务,您应确保所有服务器都与同一个中央缓存服务器进行通信,这样 Laravel 才能准确地判断任务是否唯一。

在处理开始之前保持任务唯一

默认情况下,唯一任务会在任务完成处理或失败所有重试后解锁。然而,有时您希望任务在处理之前立即解锁。为此,您的任务应该实现 ShouldBeUniqueUntilProcessing 合同,而不是 ShouldBeUnique 合同:

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一任务锁

在后台,当调度 ShouldBeUnique 任务时,Laravel 会尝试获取带有 uniqueId 键的锁。如果未能获取锁,任务将不会被调度。该锁会在任务完成处理或失败所有重试后释放。默认情况下,Laravel 会使用默认缓存驱动程序来获取此锁。不过,如果您希望使用其他驱动程序来获取锁,可以定义一个 uniqueVia 方法,该方法返回应该用于获取锁的缓存驱动程序:

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...

    /**
     * Get the cache driver for the unique job lock.
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

如果您只需要限制任务的并发处理,请改用 WithoutOverlapping 任务中间件。

加密任务

Laravel 允许你通过加密来确保任务数据的隐私性和完整性。要开始使用,只需将 ShouldBeEncrypted 接口添加到任务类中。一旦该接口添加到类中,Laravel 会在将任务推送到队列之前自动对任务进行加密:

<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

任务中间件

任务中间件允许你在队列任务的执行过程中封装自定义逻辑,从而减少任务本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 限流特性,每 5 秒钟只允许一个任务执行:

use Illuminate\Support\Facades\Redis;

/**
 * 执行任务。
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('锁定成功...');

        // 处理任务...
    }, function () {
        // 无法获取锁...

        return $this->release(5);
    });
}

虽然这段代码是有效的,但 handle 方法的实现变得很冗长,因为它充斥着 Redis 限流逻辑。此外,这些限流逻辑需要在其它想要进行限流的任务中重复编写。

与其在 handle 方法中进行限流,我们可以定义一个任务中间件来处理限流逻辑。Laravel 并没有为任务中间件指定默认位置,所以你可以将任务中间件放置在应用程序中的任何位置。在这个例子中,我们将中间件放置在 app/Jobs/Middleware 目录下:

<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理队列任务。
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // 锁定成功...

                    $next($job);
                }, function () use ($job) {
                    // 无法获取锁...

                    $job->release(5);
                });
    }
}

正如你所看到的,和【路由中间件】一样,任务中间件接收正在处理的任务和一个回调,回调需要在任务处理完成后继续执行。

创建任务中间件后,你可以通过在任务的 middleware 方法中返回它们,将中间件附加到任务上。需要注意的是,这个 middleware 方法在使用 make:job Artisan 命令生成的任务中并不存在,所以你需要手动将它添加到任务类中:

use App\Jobs\Middleware\RateLimited;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}

任务中间件也可以分配给可排队的事件监听器、邮件和通知。

速率限制

虽然我们刚刚演示了如何编写自己的限流任务中间件,Laravel 实际上包含了一个可以用来限流任务的中间件。与路由限流器类似,任务限流器是通过 RateLimiter 门面中的 for 方法来定义的。

例如,你可能希望允许用户每小时备份一次数据,但对付费用户不加限制。为了实现这一点,你可以在 AppServiceProviderboot 方法中定义一个 RateLimiter:

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * 启动应用程序服务。
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的例子中,我们定义了一个每小时的限流;不过,你也可以使用 perMinute 方法定义基于分钟的限流。此外,你可以将任何值传递给限流的 by 方法;不过,这个值通常用来根据客户进行限流分段:

return Limit::perMinute(50)->by($job->user->id);

一旦定义了限流,你可以通过 Illuminate\Queue\Middleware\RateLimited 中间件将限流器附加到任务上。每次任务超过限流时,这个中间件会根据限流的持续时间,将任务释放回队列,并附加适当的延迟。

use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将一个限流的任务释放回队列时,任务的总尝试次数仍然会增加。你可能需要相应地调整任务类中的 triesmaxExceptions 属性。或者,你可以使用 retryUntil 方法来定义任务不再尝试的时间。

如果你不希望任务在限流时被重试,可以使用 dontRelease 方法:

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}

如果你使用 Redis,你可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,它是针对 Redis 进行了优化,比基础的限流中间件更高效。

防止任务重叠

Laravel 包含了一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许你根据任意的键来防止任务的重叠。当一个队列任务正在修改某个资源时,这个功能非常有用,特别是当你希望同一时间只有一个任务能够修改该资源时。

例如,假设你有一个队列任务用来更新用户的信用评分,并且你希望防止同一个用户 ID 的信用评分更新任务发生重叠。为此,你可以在任务的 middleware 方法中返回 WithoutOverlapping 中间件:

use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

任何相同类型的重叠任务都会被释放回队列。你还可以指定在任务被释放后,多少秒后该任务将重新尝试执行:

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果你希望立即删除任何重叠任务,以避免它们被重试,可以使用 dontRelease 方法:

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件是通过 Laravel 的原子锁功能来实现的。有时候,任务可能会意外失败或超时,导致锁未被释放。因此,你可以显式地定义一个锁的过期时间,使用 expireAfter 方法。例如,下面的代码会在任务开始处理后的三分钟内指示 Laravel 释放 WithoutOverlapping 锁:

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

WithoutOverlapping 中间件需要一个支持【锁】的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动都支持原子锁。

在任务类之间共享锁键

默认情况下,WithoutOverlapping 中间件只会防止相同类的任务重叠。因此,即使两个不同的任务类使用相同的锁键,它们也不会被防止重叠。然而,你可以通过 shared 方法指示 Laravel 在不同任务类之间共享锁键:

use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

通过这种方式,ProviderIsDownProviderIsUp 两个不同的任务类都将共享相同的锁键,从而避免它们之间的重叠。

限制异常

Laravel 包含了一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许你对异常进行限速。一旦任务抛出指定数量的异常,所有进一步的任务执行将被延迟,直到指定的时间间隔过去。这个中间件对于与不稳定的第三方服务交互的任务尤其有用。

例如,假设你有一个与第三方 API 交互的队列任务,该 API 开始抛出异常。为了限速这些异常,你可以在任务的 middleware 方法中返回 ThrottlesExceptions 中间件。通常,这个中间件应与实现【基于时间的尝试次数】的任务一起使用:

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5 * 60)];
}

/**
 * 确定任务的超时时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(30);
}

此中间件的第一个构造函数参数是任务可以抛出的异常数量,超过该数量后任务将被限速。第二个构造函数参数是当任务被限速后,任务将在多少秒后再次尝试执行。在上面的示例中,如果任务连续抛出 10 个异常,我们将等待 5 分钟后再尝试执行该任务,受限于 30 分钟的时间限制。

当任务抛出异常,但异常阈值尚未达到时,任务通常会立即重试。但是,你可以通过在将中间件附加到任务时调用 backoff 方法来指定任务重试时的延迟时间:

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

内部,这个中间件使用 Laravel 的缓存系统来实现速率限制,且任务的类名被用作缓存的 “键”。你可以通过在附加中间件时调用 by 方法来覆盖此键。如果你有多个任务与同一个第三方服务交互,并且希望它们共享一个公共的限速 “桶”,那么这个功能会很有用:

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

默认情况下,该中间件将限速所有异常。你可以通过在附加中间件时调用 when 方法来修改此行为。只有当传递给 when 方法的闭包返回 true 时,异常才会被限速:

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->when(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

如果你希望将限速的异常报告给应用程序的异常处理程序,可以通过在附加中间件时调用 report 方法来实现。你还可以选择为 report 方法提供一个闭包,只有当闭包返回 true 时,异常才会被报告:

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务通过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->report(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

如果你使用 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,它针对 Redis 进行了优化,比基本的异常限速中间件更高效。

跳过任务

Skip 中间件允许你指定任务应该被跳过或删除,而无需修改任务的逻辑。如果给定的条件为 trueSkip::when 方法将删除该任务,而 Skip::unless 方法则会在条件为 false 时删除任务:

use Illuminate\Queue\Middleware\Skip;

/**
 * 获取任务通过的中间件。
 */
public function middleware(): array
{
    return [
        Skip::when($someCondition),
    ];
}

你还可以将一个闭包传递给 whenunless 方法,以进行更复杂的条件评估:

use Illuminate\Queue\Middleware\Skip;

/**
 * 获取任务通过的中间件。
 */
public function middleware(): array
{
    return [
        Skip::when(function (): bool {
            return $this->shouldSkip();
        }),
    ];
}

分派任务

一旦你编写了任务类,你可以使用任务本身的 dispatch 方法来调度它。传递给 dispatch 方法的参数将被传递给任务的构造函数:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

如果你想要有条件地调度任务,可以使用 dispatchIfdispatchUnless 方法:

ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用中,sync 驱动程序是默认的队列驱动。这个驱动程序会在当前请求的前台同步执行任务,这在本地开发时通常很方便。如果你希望实际开始将任务排入队列并进行后台处理,你可以在应用的 config/queue.php 配置文件中指定一个不同的队列驱动。

延迟分派

如果你希望指定任务在队列工作者处理之前不能立即可用,可以在调度任务时使用 delay 方法。例如,我们可以指定一个任务在被调度后 10 分钟才开始处理:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}

在某些情况下,任务可能有一个默认的延迟时间。如果你需要绕过这个延迟,并立即调度一个任务进行处理,可以使用 withoutDelay 方法:

ProcessPodcast::dispatch($podcast)->withoutDelay();

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

在响应发送到浏览器后再调度任务

另外,dispatchAfterResponse 方法会延迟调度任务,直到 HTTP 响应发送到用户的浏览器(如果你的 Web 服务器使用 FastCGI)。即使队列任务仍在执行,用户也可以继续使用应用程序。通常,这应仅用于处理大约 1 秒钟的任务,例如发送电子邮件。因为这些任务是在当前 HTTP 请求中处理的,所以通过这种方式调度的任务不需要队列工作者运行:

use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

你还可以调度一个闭包,并在 dispatch 辅助方法后链式调用 afterResponse 方法,以便在 HTTP 响应发送到浏览器后执行一个闭包:

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步分派

如果你希望立即调度一个任务(同步执行),可以使用 dispatchSync 方法。使用此方法时,任务不会被加入队列,而是会在当前进程中立即执行:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

任务与数据库事务

虽然在数据库事务中调度任务是完全可以的,但你需要特别注意确保任务能够成功执行。当在事务中调度任务时,可能会发生任务在父事务提交之前被工作者处理的情况。发生这种情况时,你在数据库事务中对模型或数据库记录所做的更新可能尚未反映到数据库中。此外,在事务中创建的任何模型或数据库记录可能还不存在于数据库中。

幸运的是,Laravel 提供了几种方法来解决这个问题。首先,你可以在队列连接的配置数组中设置 after_commit 选项:

'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项设置为 true 时,你可以在数据库事务中调度任务;然而,Laravel 会等到父数据库事务提交后才实际调度任务。当然,如果当前没有打开的数据库事务,任务将会立即调度。

如果由于事务中的异常导致事务被回滚,那么在该事务中调度的任务将会被丢弃。

after_commit 配置选项设置为 true 还会导致任何排队的事件监听器、邮件发送、通知和广播事件在所有打开的数据库事务提交之后才被调度。

内联指定提交后调度行为

如果你没有将 after_commit 队列连接配置选项设置为 true,你仍然可以指示某个特定的任务在所有打开的数据库事务提交后再调度。为此,你可以在调度操作中链式调用 afterCommit 方法:

use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,你也可以指示某个特定的任务立即调度,而无需等待任何打开的数据库事务提交:

ProcessPodcast::dispatch($podcast)->beforeCommit();

任务链式执行

任务链允许你指定一系列在主任务成功执行后依次运行的队列任务。如果链中的某个任务失败,后续的任务将不会被执行。要执行队列任务链,可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是一个较低级的组件,队列任务调度是建立在其之上的:

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接任务类实例外,你还可以链接闭包:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

在任务中使用 $this->delete() 方法删除任务不会阻止链式任务的执行。链式任务只有在某个任务失败时才会停止执行。

链式任务的连接和队列

如果你希望指定用于链式任务的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定了应使用的队列连接和队列名称,除非显式地为队列任务分配了不同的连接/队列:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向任务链中添加任务

有时你可能需要在任务链中的另一个任务执行时,向现有链中添加任务。可以使用 prependToChainappendToChain 方法来实现:

/**
 * 执行任务
 */
public function handle(): void
{
    // ...

    // 在当前链的前面添加任务,立即在当前任务之后执行...
    $this->prependToChain(new TranscribePodcast);

    // 在当前链的末尾添加任务,任务将在链末尾执行...
    $this->appendToChain(new TranscribePodcast);
}

链式任务失败

在链式任务中,你可以使用 catch 方法指定一个回调,在链中的某个任务失败时调用。给定的回调将接收导致任务失败的 Throwable 实例:

use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的某个任务失败了...
})->dispatch();

由于链式回调是序列化的,并由 Laravel 队列在稍后的时间执行,因此在链式回调中不应使用 $this 变量。

自定义队列和连接

分派到特定队列

通过将任务推送到不同的队列,你可以对队列任务进行 “分类”,甚至优先分配给不同队列的工作进程。请注意,这并不会将任务推送到你的队列配置文件中定义的不同队列“连接”,而只是将任务推送到单一连接中的特定队列。要指定队列,可以在调度任务时使用 onQueue 方法:

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建 podcast...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

另外,你也可以通过在任务的构造函数中调用 onQueue 方法来指定任务的队列:

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的任务实例。
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

分派到特定连接

如果你的应用程序与多个队列连接进行交互,可以使用 onConnection 方法来指定将任务推送到哪个连接:

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建 podcast...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

你还可以将 onConnectiononQueue 方法连用,来指定任务的连接和队列:

ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

另外,你也可以通过在任务的构造函数中调用 onConnection 方法来指定任务的连接:

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的任务实例。
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大任务尝试次数 / 超时值

最大重试次数

如果你的队列任务遇到错误,你可能不希望它无限制地重试。因此,Laravel 提供了多种方法来指定一个任务可以尝试的次数或尝试的时间。

一种指定任务最大尝试次数的方法是通过 Artisan 命令行的 --tries 参数。这将适用于所有由工作进程处理的任务,除非任务本身指定了最大尝试次数:

php artisan queue:work --tries=3

如果任务超过最大尝试次数,它将被视为 “失败的任务”。有关处理失败任务的更多信息,请参考 失败任务文档。如果 --tries=0 被提供给 queue:work 命令,任务将会无限重试。

你可以在任务类中定义最大尝试次数。如果在任务类中指定了最大尝试次数,它将优先于命令行中提供的 --tries 参数:

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务可以尝试的最大次数
     *
     * @var int
     */
    public $tries = 5;
}

如果你需要对某个特定任务的最大尝试次数进行动态控制,可以在任务类中定义 tries 方法:

/**
 * 确定任务可以尝试的最大次数
 */
public function tries(): int
{
    return 5;
}

基于时间的重试

除了定义任务在失败之前可以尝试的最大次数外,你还可以定义任务在何时不再尝试,这允许任务在给定的时间范围内尝试任意次数。要定义任务的超时时间,可以在任务类中添加 retryUntil 方法,该方法应该返回一个 DateTime 实例:

use DateTime;

/**
 * 确定任务的超时时间
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}

你还可以在【排队的事件监听器】中定义 tries 属性或 retryUntil 方法。

最大异常次数

有时你可能希望指定任务可以多次尝试,但如果重试是由于特定数量的未处理异常触发的(而不是通过 release 方法直接释放任务),则任务应该失败。为此,你可以在任务类中定义一个 maxExceptions 属性:

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务可以尝试的最大次数
     *
     * @var int
     */
    public $tries = 25;

    /**
     * 允许的最大未处理异常次数,超过此次数则任务失败
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * 执行任务
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获取锁,处理 podcast...
        }, function () {
            // 无法获取锁,释放任务...
            return $this->release(10);
        });
    }
}

在这个例子中,如果应用程序无法获取 Redis 锁,任务会被释放 10 秒钟并重试最多 25 次。然而,如果任务抛出 3 次未处理异常,任务会失败。

超时

通常,你大概知道你的队列任务需要多长时间。因此,Laravel 允许你指定一个“超时”值。默认情况下,超时时间是 60 秒。如果任务的处理时间超过了指定的超时时间,处理任务的工作进程将退出并报错。通常,工作进程会被服务器上配置的进程管理器自动重启。

你可以使用 --timeout 参数在 Artisan 命令行中指定任务的最大运行时间(单位:秒):

php artisan queue:work --timeout=30

如果任务因为超时而超过最大尝试次数,它将被标记为失败。

你还可以在任务类中定义任务的最大运行时间。如果任务类中指定了超时,它将优先于命令行中指定的超时时间:

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务允许运行的最大时间(秒)
     *
     * @var int
     */
    public $timeout = 120;
}

有时,像套接字或外部 HTTP 请求这样的 IO 阻塞过程可能不遵守你指定的超时。因此,在使用这些特性时,你应该总是尝试通过它们的 API 来指定超时。例如,当使用 Guzzle 时,你应该始终指定连接和请求的超时值。

必须安装 pcntl PHP 扩展才能指定任务超时。此外,任务的 “超时” 值应该始终小于其 “重试间隔” 值,否则任务可能会在完成或超时之前重新尝试。

超时失败

如果你希望在任务超时后将其标记为失败,可以在任务类中定义 $failOnTimeout 属性:

/**
 * 指示任务是否应在超时后标记为失败
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

手动释放任务

有时,你可能希望手动将任务释放回队列,以便稍后再进行尝试。你可以通过调用 release 方法来实现:

/**
 * 执行任务。
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,release 方法会立即将任务释放回队列,供下一个工作进程处理。然而,你也可以通过向 release 方法传递一个整数或日期实例,指示队列在一定时间后再使任务可用:

$this->release(10); // 等待10秒后重试任务

$this->release(now()->addSeconds(10)); // 等待指定的时间后重试任务

手动标记任务为失败

有时你可能需要手动将任务标记为 “失败”。你可以通过调用 fail 方法来实现:

/**
 * 执行任务。
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果你想标记任务为失败,并且是因为捕获到的异常导致的,你可以将异常传递给 fail 方法。或者,为了方便,你也可以传递一个字符串错误信息,Laravel 会将其转换为异常:

$this->fail($exception); // 传递异常

$this->fail('Something went wrong.'); // 传递错误信息

有关失败任务的更多信息,请查看 处理失败任务的文档

任务批处理

Laravel 的任务批处理功能允许你轻松地执行一批任务,并在任务批次完成执行后执行某些操作。在开始之前,你应该创建一个数据库迁移来构建一个表,该表将包含有关任务批次的元信息,例如其完成百分比。你可以使用 make:queue-batches-table Artisan 命令生成该迁移:

php artisan make:queue-batches-table

php artisan migrate

定义可批处理任务

要定义一个可批处理的任务,你应该像正常一样创建一个队列任务;但是,你需要在任务类中添加 Illuminate\Bus\Batchable 特性。这个特性提供了对 batch 方法的访问,可以用来检索当前任务正在执行的批次:

<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ImportCsv implements ShouldQueue
{
    use Batchable, Queueable;

    /**
     * 执行任务。
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // 判断批次是否已取消...

            return;
        }

        // 导入 CSV 文件的一部分...
    }
}

分派批处理任务

要调度一批任务,你应该使用 Bus 门面的 batch 方法。当然,批量处理最常用于结合完成回调一起使用。因此,你可以使用 thencatchfinally 方法来定义批次的完成回调。每个回调在调用时都会接收到一个 Illuminate\Bus\Batch 实例。以下是一个示例,假设我们正在调度一批处理 CSV 文件中指定行数的任务:

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // 批次已创建,但尚未添加任务...
})->progress(function (Batch $batch) {
    // 一个任务已成功完成...
})->then(function (Batch $batch) {
    // 所有任务都成功完成...
})->catch(function (Batch $batch, Throwable $e) {
    // 检测到第一个任务失败...
})->finally(function (Batch $batch) {
    // 批次执行完成...
})->dispatch();

return $batch->id;

批次的 ID 可以通过 $batch->id 属性访问,在任务调度后可以用来查询 Laravel 命令总线中关于该批次的信息。

由于批量回调是序列化的,并且在稍后由 Laravel 队列执行,因此在回调中不应使用 $this 变量。此外,由于批量任务被包装在数据库事务中,触发隐式提交的数据库语句不应在任务中执行。

批次命名

像 Laravel Horizon 和 Laravel Telescope 这样的工具,如果批次有命名,可能会提供更易于调试的用户友好信息。要为批次分配一个自定义名称,可以在定义批次时调用 name 方法:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有任务成功完成...
})->name('Import CSV')->dispatch();

批次连接和队列

如果你希望指定批量任务应该使用的连接和队列,可以使用 onConnectiononQueue 方法。所有批量任务必须在相同的连接和队列中执行:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有任务成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();

链式与批处理

你可以通过将链式任务放在数组中,在批次中定义一组链式任务。例如,我们可以并行执行两组任务链,并在两组任务链都完成后执行回调:

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

反过来,你也可以在链式任务中运行批量任务,通过在【链】中定义批次。例如,你可以首先运行一批任务来发布多个播客,然后运行另一批任务来发送发布通知:

use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

向批处理中添加任务

有时,从批量任务内部向批次添加额外的任务是很有用的。这种模式在需要批量处理成千上万个任务时尤其有用,因为这些任务可能会在网页请求期间耗费太多时间。因此,你可能希望先调度一批 “加载” 任务,这些任务会将更多的任务添加到批次中:

$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // 所有任务成功完成...
})->name('Import Contacts')->dispatch();

在这个例子中,我们使用 LoadImportBatch 任务来向批次中添加更多的任务。为此,我们可以使用批次实例上的 add 方法,该方法可以通过任务的 batch 方法访问:

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * 执行任务。
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    // 向批次添加 1000 个任务
    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}

你只能从属于同一批次的任务中向批次添加任务。

检查批处理

Illuminate\Bus\Batch 实例提供了多种属性和方法,帮助你与批次中的任务进行交互和检查批次的状态:

// 批次的 UUID...
$batch->id;

// 批次的名称(如果有的话)...
$batch->name;

// 分配给批次的任务数量...
$batch->totalJobs;

// 尚未被队列处理的任务数量...
$batch->pendingJobs;

// 失败的任务数量...
$batch->failedJobs;

// 到目前为止已经处理的任务数量...
$batch->processedJobs();

// 批次的完成百分比(0-100)...
$batch->progress();

// 表示批次是否已完成执行...
$batch->finished();

// 取消批次的执行...
$batch->cancel();

// 表示批次是否已被取消...
$batch->cancelled();

从路由返回批次

所有 Illuminate\Bus\Batch 实例都是可 JSON 序列化的,这意味着你可以直接从应用的某个路由返回它们,以获取一个包含批次信息的 JSON 响应,其中包括批次的完成进度。这使得在应用的 UI 中显示批次的完成进度变得方便。

要通过批次的 ID 获取批次,你可以使用 Bus facade 的 findBatch 方法:

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批处理

有时你可能需要取消某个批次的执行。你可以通过调用 Illuminate\Bus\Batch 实例的 cancel 方法来实现这一点:

/**
 * 执行任务。
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

如你在之前的示例中所注意到的,批量任务通常应该在继续执行之前检查它们对应的批次是否已被取消。然而,为了方便,你可以为任务指定 SkipIfBatchCancelled 【中间件】。正如其名字所示,这个中间件会指示 Laravel 如果对应的批次已被取消,则不处理该任务:

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * 获取任务应通过的中间件。
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批处理失败

当批量任务失败时,如果已定义 catch 回调,该回调将被触发。此回调仅在批量中的第一个任务失败时被调用。

允许失败

当批次中的任务失败时,Laravel 会自动将该批次标记为 “已取消”。如果你希望禁用这一行为,使得任务失败时不会自动取消批次,可以在调度批次时调用 allowFailures 方法来实现:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有任务成功完成...
})->allowFailures()->dispatch();

重试失败的批量任务

为了方便起见,Laravel 提供了 queue:retry-batch Artisan 命令,可以轻松地重试给定批次的所有失败任务。queue:retry-batch 命令接受批次的 UUID,该 UUID 对应于要重试失败任务的批次:

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

清理批处理

没有清理功能的话,job_batches 表可能会快速积累记录。为了减少这种情况,应该每天调度运行 queue:prune-batches Artisan 命令:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

默认情况下,所有完成超过 24 小时的批次将会被清理。你可以使用 --hours 选项来决定保留批次数据的时间。例如,以下命令会删除所有完成超过 48 小时的批次:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

有时,job_batches 表可能会积累未完成的批次记录,例如任务失败且该任务从未成功重试的批次。你可以使用 unfinished 选项来指示 queue:prune-batches 命令清理这些未完成的批次记录:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,job_batches 表也可能会积累已取消的批次记录。你可以使用 --cancelled 选项来指示 queue:prune-batches 命令清理这些已取消的批次记录:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

将批处理存储在 DynamoDB 中

Laravel 还支持将批次的元数据存储在 DynamoDB 中,而不是关系型数据库。为了实现这一点,您需要手动创建一个 DynamoDB 表来存储所有的批次记录。

通常,表名应为 job_batches,但是您可以根据应用程序队列配置文件中的 queue.batching.table 配置值来命名该表。

DynamoDB 批次表配置

job_batches 表应具有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键中的 application 部分将包含应用程序的名称,该名称由应用程序的 app 配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用相同的表来存储多个 Laravel 应用程序的作业批次。

此外,如果您希望利用自动批次修剪功能,可以为表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 进行通信:

composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您还应在批次配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb 驱动程序时,queue.batching.database 配置选项不再需要:

'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在 DynamoDB 中修剪批次

当使用 【DynamoDB】 存储作业批次信息时,通常用于修剪关系型数据库中的批次的命令将不起作用。相反,您可以利用 【DynamoDB 的原生 TTL(过期时间)功能】来自动删除旧批次记录。

如果您为 DynamoDB 表定义了 ttl 属性,则可以定义配置参数来指导 Laravel 如何修剪批次记录。queue.batching.ttl_attribute 配置值定义了保存 TTL 的属性名称,而 queue.batching.ttl 配置值定义了从记录最后一次更新时间起,可以将批次记录从 DynamoDB 表中删除的秒数:

'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 天...
],

队列闭包

除了将作业类调度到队列中,您还可以调度一个闭包。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。当将闭包调度到队列时,闭包的代码内容会被加密签名,以确保它在传输过程中不会被修改:

$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

通过 catch 方法,您可以提供一个闭包,如果队列中的闭包在尝试所有配置的重试次数后仍未成功完成,则该闭包会被执行:

use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // 该作业失败了...
});

由于 catch 回调是序列化的,并在稍后的时间由 Laravel 队列执行,因此在 catch 回调中不应使用 $this 变量。

运行队列工作进程

queue:work 命令

Laravel 包含一个 Artisan 命令,可以启动队列工作程序,并在将新的任务推送到队列时处理它们。你可以使用 queue:work Artisan 命令来运行该工作程序。请注意,一旦启动 queue:work 命令,它将持续运行,直到手动停止或关闭终端:

php artisan queue:work

为了让 queue:work 进程在后台永久运行,你应该使用进程监控工具,如 【Supervisor】,确保队列工作程序不会停止运行。

你还可以在调用 queue:work 命令时使用 -v 标志,以便在命令输出中包含已处理任务的 ID:

php artisan queue:work -v

请记住,队列工作程序是长时间运行的进程,并会将启动的应用状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。所以,在部署过程中,请确保重启队列工作程序。此外,应用中创建或修改的任何静态状态在任务之间不会自动重置。

或者,你可以运行 queue:listen 命令。当使用 queue:listen 命令时,你不必手动重启工作程序来重新加载更新的代码或重置应用状态;然而,这个命令比 queue:work 命令效率低得多:

php artisan queue:listen

运行多个队列工作程序

要分配多个工作程序来处理队列并并发处理任务,你只需启动多个 queue:work 进程。你可以通过多个终端标签在本地进行操作,或者在生产环境中通过进程管理器的配置设置来完成。使用 Supervisor 时,你可以使用 numprocs 配置值来启动多个进程。

指定连接和队列

你还可以指定工作程序应使用的队列连接。传递给 queue:work 命令的连接名称应该与 config/queue.php 配置文件中定义的连接之一相对应:

php artisan queue:work redis

默认情况下,queue:work 命令只处理给定连接的默认队列。然而,你可以进一步自定义队列工作程序,只处理给定连接的特定队列。例如,如果你所有的电子邮件都在 Redis 队列连接中的 emails 队列中处理,你可以使用以下命令启动一个仅处理该队列的工作程序:

php artisan queue:work redis --queue=emails

处理指定数量的任务

--once 选项可用于指示工作程序只处理队列中的一个任务:

php artisan queue:work --once

--max-jobs 选项可用于指示工作程序处理给定数量的任务,然后退出。这个选项在与 Supervisor 配合使用时非常有用,能够在处理一定数量的任务后自动重启工作程序,释放它们可能累积的内存:

php artisan queue:work --max-jobs=1000

处理所有队列中的任务然后退出

--stop-when-empty 选项可用于指示工作程序处理完所有任务后优雅地退出。这个选项在 Docker 容器中处理 Laravel 队列时非常有用,如果你希望在队列为空时关闭容器:

php artisan queue:work --stop-when-empty

处理任务指定的秒数

--max-time 选项可用于指示工作程序处理任务指定的秒数,然后退出。这个选项与 【Supervisor】 配合使用时也很有用,能够在处理一定时间的任务后自动重启工作程序,释放它们可能累积的内存:

# 处理一个小时的任务,然后退出...
php artisan queue:work --max-time=3600

工作程序睡眠时长

当队列中有任务时,工作程序将不断处理任务,任务之间没有延迟。然而,sleep 选项决定了如果没有任务可用时,工作程序将 “睡眠” 多少秒。自然,在睡眠期间,工作程序不会处理新任务:

php artisan queue:work --sleep=3

维护模式和队列

当应用处于维护模式时,队列中的任务将不会被处理。任务会在应用退出维护模式后继续正常处理。

如果你希望即使在维护模式启用时,队列工作程序也能处理任务,你可以使用 --force 选项:

php artisan queue:work --force

资源考虑

守护进程队列工作程序在处理每个任务之前不会 “重启” 框架。因此,你应该在每个任务完成后释放任何重的资源。例如,如果你正在使用 GD 库进行图像处理,你应该在完成图像处理后使用 imagedestroy 来释放内存。

队列优先级

有时候,你可能希望优先处理队列中的任务。例如,在 config/queue.php 配置文件中,你可以将 Redis 连接的默认队列设置为低优先级。然而,偶尔你可能希望将一个任务推送到高优先级的队列,像这样:

dispatch((new Job)->onQueue('high'));

为了启动一个工作程序,确保所有高优先级队列的任务在继续处理低优先级队列的任务之前被处理,可以在 work 命令中传递一个由逗号分隔的队列名称列表:

php artisan queue:work --queue=high,low

队列工作进程与部署

由于队列工作程序是长期运行的进程,它们在没有重启的情况下不会注意到代码的更改。因此,部署使用队列工作程序的应用程序的最简单方法是在部署过程中重启工作程序。你可以通过执行 queue:restart 命令来优雅地重启所有工作程序:

php artisan queue:restart

该命令会指示所有队列工作程序在完成当前任务后优雅地退出,以确保不会丢失现有的任务。由于队列工作程序会在执行 queue:restart 命令时退出,因此你应该使用进程管理工具(如 【Supervisor】)来自动重启队列工作程序。

队列使用【缓存】来存储重启信号,因此在使用此功能之前,你应该确认你的应用程序已正确配置了缓存驱动。

任务过期与超时

任务过期

config/queue.php 配置文件中,每个队列连接都定义了一个 retry_after 选项。该选项指定了队列连接在重新尝试一个正在处理的任务之前应等待的秒数。例如,如果 retry_after 的值设置为 90,当任务已经处理了 90 秒而没有被释放或删除时,任务将被重新放回队列。通常,你应该将 retry_after 的值设置为任务合理完成处理所需的最大秒数。

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 会根据 AWS 控制台中管理的 【默认可见性超时】 来重试任务。

工作程序超时

queue:work Artisan 命令提供了一个 --timeout 选项。默认情况下,--timeout 的值为 60 秒。如果一个任务的处理时间超过了 timeout 值指定的秒数,处理该任务的工作程序将以错误状态退出。通常,工作程序会被配置在服务器上的进程管理器自动重启:

php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但它们共同工作,确保任务不会丢失,并且任务只会成功处理一次。

--timeout 值应该始终比 retry_after 配置值短几秒钟。这将确保处理冻结任务的工作程序在任务被重试之前总是被终止。如果你的 --timeout 选项比 retry_after 配置值长,可能会导致任务被处理两次。

监控配置

在生产环境中,你需要一种方法来保持 queue:work 进程的持续运行。queue:work 进程可能因各种原因停止运行,例如超出了工作程序的超时时间或执行了 queue:restart 命令。

因此,你需要配置一个进程监控工具,能够检测到 queue:work 进程退出并自动重新启动它们。此外,进程监控工具还可以让你指定希望同时运行多少个 queue:work 进程。Supervisor 是一个常见的 Linux 环境下的进程监控工具,接下来的文档将讨论如何配置它。

安装 Supervisor

Supervisor 是一个适用于 Linux 操作系统的进程监控工具,能够在 queue:work 进程失败时自动重启它们。要在 Ubuntu 上安装 Supervisor,你可以使用以下命令:

sudo apt-get install supervisor

如果自己配置和管理 Supervisor 感到困难,可以考虑使用 【Laravel Forge】,它会为你的生产环境 Laravel 项目自动安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在这个目录下,你可以创建任意数量的配置文件,指示 Supervisor 如何监控你的进程。例如,我们可以创建一个 laravel-worker.conf 文件,来启动并监控 queue:work 进程:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在这个例子中,numprocs 指令将指示 Supervisor 启动 8 个 queue:work 进程,并监控它们,如果进程失败,会自动重启它们。你应该根据你的队列连接和工作程序选项修改 command 指令。

你需要确保 stopwaitsecs 的值大于你最长运行任务所消耗的秒数。否则,Supervisor 可能会在任务处理完之前终止它。

启动 Supervisor

一旦配置文件创建完毕,你可以使用以下命令更新 Supervisor 配置并启动进程:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

有关 Supervisor 的更多信息,可以查阅 【Supervisor 文档】。

处理失败任务

有时,你的队列任务会失败。别担心,事情并不总是按计划进行!Laravel 提供了一个方便的方式来指定任务应该被尝试的最大次数。一个异步任务超过这个最大尝试次数后,它将被插入到 failed_jobs 数据库表中。同步分派的任务如果失败,异常会立即由应用程序处理,而不会存储在该表中。

通常,新的 Laravel 应用程序中会已经包含一个用于创建 failed_jobs 表的迁移文件。然而,如果你的应用没有包含该表的迁移,你可以使用 make:queue-failed-table 命令来创建迁移:

php artisan make:queue-failed-table

php artisan migrate

在运行队列工作程序进程时,你可以使用 --tries 选项指定任务最大重试次数。如果没有为 --tries 选项指定值,任务将只会尝试一次,或者根据任务类的 $tries 属性进行重试:

php artisan queue:work redis --tries=3

使用 --backoff 选项,你可以指定 Laravel 在任务遇到异常时应该等待多少秒再重试。默认情况下,任务会立即被释放回队列,等待再次尝试:

php artisan queue:work redis --tries=3 --backoff=3

如果你想要为每个任务单独配置重试延迟,你可以在任务类中定义一个 backoff 属性:

/**
 * 任务重试前等待的秒数。
 *
 * @var int
 */
public $backoff = 3;

如果你需要更复杂的逻辑来确定任务的重试延迟,可以在任务类中定义一个 backoff 方法:

/**
 * 计算任务重试前等待的秒数。
 */
public function backoff(): int
{
    return 3;
}

你可以通过返回一个包含重试延迟值的数组来轻松配置 “指数” 重试延迟。在以下示例中,第一次重试的延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,之后的所有重试也都为 10 秒:

/**
 * 计算任务重试前等待的秒数。
 *
 * @return array<int, int>
 */
public function backoff(): array
{
    return [1, 5, 10];
}

清理失败任务

当某个任务失败时,你可能希望向用户发送通知,或者撤销任务可能已部分完成的操作。为此,你可以在任务类中定义一个 failed 方法。导致任务失败的 Throwable 实例会被传递给 failed 方法:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的任务实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行任务。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }

    /**
     * 处理任务失败。
     */
    public function failed(?Throwable $exception): void
    {
        // 向用户发送失败通知等...
    }
}

在调用 failed 方法之前,任务会重新实例化;因此,在 handle 方法中可能发生的任何类属性修改都会丢失。

重试失败任务

要查看所有已插入 failed_jobs 数据库表中的失败任务,你可以使用 queue:failed Artisan 命令:

php artisan queue:failed

queue:failed 命令将列出任务的 ID、连接、队列、失败时间以及其它有关任务的信息。任务 ID 可用于重新尝试失败的任务。例如,要重试一个 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败任务,可以使用以下命令:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如果需要,你可以传递多个任务 ID 给命令:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

你也可以选择只重试特定队列的所有失败任务:

php artisan queue:retry --queue=name

要重试所有失败的任务,可以执行 queue:retry 命令并传递 all 作为 ID:

php artisan queue:retry all

如果你想删除一个失败的任务,可以使用 queue:forget 命令:

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

当使用 Horizon 时,应该使用 horizon:forget 命令来删除失败的任务,而不是 queue:forget 命令。

要从 failed_jobs 表中删除所有失败的任务,可以使用 queue:flush 命令:

php artisan queue:flush

忽略缺失模型

当将 Eloquent 模型注入到一个任务中时,模型会在放入队列之前自动进行序列化,并在任务处理时从数据库重新检索。然而,如果在任务等待被工作进程处理期间,模型被删除,任务可能会因 ModelNotFoundException 异常而失败。

为了方便起见,你可以选择在模型缺失时自动删除任务,方法是将任务的 deleteWhenMissingModels 属性设置为 true。当此属性设置为 true 时,Laravel 会在没有抛出异常的情况下悄悄丢弃该任务:

/**
 * 如果任务的模型不存在,则删除该任务。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

清理失败任务

你可以通过执行 queue:prune-failed Artisan 命令来清理应用程序中的 failed_jobs 表记录:

php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败任务记录将被清理。如果你提供 --hours 选项,只有在过去 N 小时内插入的失败任务记录会被保留。例如,以下命令将删除所有插入时间超过 48 小时的失败任务记录:

php artisan queue:prune-failed --hours=48

将失败任务存储在 DynamoDB 中

Laravel 还支持将失败的任务记录存储在 DynamoDB 中,而不是关系型数据库表中。然而,你需要手动创建一个 DynamoDB 表来存储所有失败的任务记录。通常,这个表应该命名为 failed_jobs,但你应该根据应用程序的队列配置文件中的 queue.failed.table 配置值来命名该表。

failed_jobs 表应该有一个名为 application 的字符串类型主分区键和一个名为 uuid 的字符串类型主排序键。application 键部分将包含你的应用程序名称,该名称由应用程序配置文件中的 name 配置值定义。由于应用程序名称是 DynamoDB 表的键的一部分,你可以使用同一个表来存储多个 Laravel 应用程序的失败任务。

此外,确保你安装了 AWS SDK,以便你的 Laravel 应用程序能够与 Amazon DynamoDB 进行通信:

composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,你应该在失败任务配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb 驱动时,queue.failed.database 配置选项将不再需要:

'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败任务存储

你可以通过将 queue.failed.driver 配置选项的值设置为 null,指示 Laravel 丢弃失败的任务,而不将其存储。通常,这可以通过设置 QUEUE_FAILED_DRIVER 环境变量来实现:

QUEUE_FAILED_DRIVER=null

失败任务事件

如果你希望注册一个事件监听器,当任务失败时会被触发,可以使用 Queue 门面的 failing 方法。例如,我们可以在 Laravel 提供的 AppServiceProviderboot 方法中附加一个闭包来处理这个事件:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册应用服务。
     */
    public function register(): void
    {
        // ...
    }

    /**
     * 启动应用服务。
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

清除队列中的任务

在使用 Horizon 时,应该使用 horizon:clear 命令来清除队列中的任务,而不是使用 queue:clear 命令。

如果你希望删除默认连接的默认队列中的所有任务,可以使用 queue:clear Artisan 命令:

php artisan queue:clear

你还可以提供 connection 参数和 queue 选项,以便从特定的连接和队列中删除任务:

php artisan queue:clear redis --queue=emails

清除队列中的任务仅适用于 SQS、Redis 和数据库队列驱动。此外,SQS 消息删除过程可能需要最多 60 秒,因此在你清空队列后最多 60 秒内发送到 SQS 队列的任务也可能被删除。

监控队列

如果你的队列接收到大量任务,可能会导致队列超载,从而导致任务处理的等待时间过长。如果需要,Laravel 可以在队列中的任务数量超过指定阈值时提醒你。

首先,你应该将 queue:monitor 命令安排每分钟运行一次。该命令接受你希望监控的队列名称以及你希望设置的任务数量阈值:

php artisan queue:monitor redis:default,redis:deployments --max=100

仅仅安排此命令并不足以触发当队列任务数量超过阈值时的通知。命令在遇到任务数量超过阈值的队列时,会触发一个 Illuminate\Queue\Events\QueueBusy 事件。你可以在应用的 AppServiceProvider 中监听此事件,以便向你或你的开发团队发送通知:

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * 启动任何应用服务。
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

测试

在测试调度任务的代码时,你可能希望指示 Laravel 不实际执行任务,因为任务的代码可以独立于调度它的代码进行测试。自然地,要测试任务本身,你可以直接实例化任务类并在测试中调用 handle 方法。

你可以使用 Queue facade 的 fake 方法来防止任务被真正推送到队列中。在调用 Queue::fake 方法之后,你可以断言应用是否尝试将任务推送到队列:

  • Pest

  • PHPUnit

<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // Perform order shipping...

    // Assert that no jobs were pushed...
    Queue::assertNothingPushed();

    // Assert a job was pushed to a given queue...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);

    // Assert a job was not pushed...
    Queue::assertNotPushed(AnotherJob::class);

    // Assert that a Closure was pushed to the queue...
    Queue::assertClosurePushed();

    // Assert the total number of jobs that were pushed...
    Queue::assertCount(3);
});
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // Perform order shipping...

        // Assert that no jobs were pushed...
        Queue::assertNothingPushed();

        // Assert a job was pushed to a given queue...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // Assert a job was pushed twice...
        Queue::assertPushed(ShipOrder::class, 2);

        // Assert a job was not pushed...
        Queue::assertNotPushed(AnotherJob::class);

        // Assert that a Closure was pushed to the queue...
        Queue::assertClosurePushed();

        // Assert the total number of jobs that were pushed...
        Queue::assertCount(3);
    }
}

你可以向 assertPushedassertNotPushed 方法传递一个闭包,来断言某个任务是否被推送,并且该任务符合给定的“真值测试”。如果至少有一个任务通过了该测试,断言将会成功:

Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

伪造部分任务

如果你只需要模拟特定的任务,同时允许其它任务正常执行,你可以将需要模拟的任务类名传递给 fake 方法:

  • Pest

  • PHPUnit

test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
}

你也可以使用 except 方法模拟所有任务,除了指定的任务集合:

Queue::fake()->except([
    ShipOrder::class,
]);

测试任务链

要测试任务链,你需要利用 Bus facade 的模拟功能。Bus facade 的 assertChained 方法可以用来断言任务链是否被分发。assertChained 方法接受一个任务链数组作为第一个参数:

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

如上例所示,任务链的数组可以是任务类的类名数组。你也可以提供一个实际的任务实例数组。在这种情况下,Laravel 会确保任务实例属于相同的类,并且拥有与应用程序中分发的任务链相同的属性值:

Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

你可以使用 assertDispatchedWithoutChain 方法断言某个任务在没有任务链的情况下被推送:

Bus::assertDispatchedWithoutChain(ShipOrder::class);

测试任务链的修改

如果某个任务在任务链前面或后面添加了其它任务,你可以使用任务的 assertHasChain 方法来断言任务是否具有预期的剩余任务链:

$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

assertDoesntHaveChain 方法可以用来断言任务的剩余链为空:

$job->assertDoesntHaveChain();

测试链中的批量任务

如果你的任务链包含一个任务批量,你可以通过在链断言中插入 Bus::chainedBatch 来断言链中的批量任务符合预期:

use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

测试任务批处理

Bus facade 的 assertBatched 方法可用于断言是否分发了一个批量任务。传递给 assertBatched 方法的闭包会接收一个 Illuminate\Bus\PendingBatch 实例,可以用来检查批量任务中的任务:

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

你可以使用 assertBatchCount 方法断言是否分发了给定数量的批量任务:

Bus::assertBatchCount(3);

你还可以使用 assertNothingBatched 方法断言没有分发任何批量任务:

Bus::assertNothingBatched();

测试任务与批量任务的交互

此外,有时你可能需要测试单个任务与其底层批量任务的交互。例如,你可能需要测试一个任务是否取消了其批量任务的进一步处理。为此,你需要通过 withFakeBatch 方法将一个假批量任务分配给任务。withFakeBatch 方法返回一个包含任务实例和假批量任务的元组:

[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

测试任务 / 队列交互

有时,你可能需要测试一个排队任务是否将自己重新放回队列中,或者你可能需要测试任务是否删除了自己。你可以通过实例化任务并调用 withFakeQueueInteractions 方法来测试这些队列交互。

一旦任务的队列交互被伪造,你可以调用任务的 handle 方法。调用任务后,可以使用 assertReleasedassertDeletedassertNotDeletedassertFailedassertNotFailed 方法对任务的队列交互进行断言:

use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertNotFailed();

任务事件

Bus facade 的 assertBatched 方法可以用来断言一批任务是否被分发。传递给 assertBatched 方法的闭包接收一个 Illuminate\Bus\PendingBatch 实例,你可以使用它来检查批量任务中的任务:

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

你可以使用 assertBatchCount 方法来断言是否分发了指定数量的批量任务:

Bus::assertBatchCount(3);

你也可以使用 assertNothingBatched 方法来断言没有分发任何批量任务:

Bus::assertNothingBatched();

测试任务与批量任务的交互

此外,有时你可能需要测试单个任务与其底层批量任务的交互。例如,你可能需要测试一个任务是否取消了其批量任务的后续处理。为了实现这一点,你需要通过 withFakeBatch 方法将一个假的批量任务分配给该任务。withFakeBatch 方法返回一个包含任务实例和假的批量任务的元组:

[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);