队列
介绍
在构建 web 应用程序时,有些任务(例如解析和存储上传的 CSV 文件)可能需要花费很长时间才能完成,这样在常规的 web 请求中执行可能会造成性能问题。幸运的是,Laravel 允许你轻松创建队列任务,这些任务可以在后台处理。通过将耗时的任务移到队列中,你的应用程序能够快速响应 web 请求,提供更好的用户体验。
Laravel 队列为不同的队列后端(如 Amazon SQS、 Redis,甚至是关系型数据库)提供了统一的队列 API。
Laravel 的队列配置选项存储在应用程序的 config/queue.php
配置文件中。在这个文件中,你将找到与框架中包含的各个队列驱动程序相关的连接配置,包括数据库驱动、 Amazon SQS、 Redis 和 Beanstalkd 驱动,以及一个会立即执行任务的同步驱动(用于本地开发)。此外,还有一个 null
队列驱动,它会丢弃队列中的任务。
Laravel 现在提供了 Horizon,这是一个美丽的仪表盘和配置系统,专门为 Redis 驱动的队列设计。欲了解更多信息,请查看完整的 【Horizon 文档】。 |
连接与队列
在开始使用 Laravel 队列之前,理解 “连接” 和 “队列” 之间的区别非常重要。在你的 config/queue.php
配置文件中,有一个 connections
配置数组。这个选项定义了与后台队列服务(如 Amazon SQS、Beanstalk 或 Redis)连接的配置。然而,任何给定的队列连接可能会有多个 “队列”,这些队列可以理解为不同的任务堆栈或堆积。
请注意,在队列配置文件中的每个连接配置示例都包含一个 queue
属性。这个属性定义了任务被派发时,默认发送到哪个队列。换句话说,如果你派发任务时没有明确指定应该发送到哪个队列,任务将会被放置到连接配置中定义的默认队列:
use App\Jobs\ProcessPodcast;
// 这个任务会被发送到默认连接的默认队列...
ProcessPodcast::dispatch();
// 这个任务会被发送到默认连接的 "emails" 队列...
ProcessPodcast::dispatch()->onQueue('emails');
有些应用程序可能不需要将任务推送到多个队列,而是倾向于使用一个简单的队列。然而,将任务推送到多个队列对于希望优先处理或对任务处理进行分段的应用程序特别有用,因为 Laravel 队列工作器允许你指定应该按优先级处理哪些队列。例如,如果你将任务推送到一个高优先级队列,你可以运行一个工作进程,赋予该队列更高的处理优先级:
php artisan queue:work --queue=high,default
驱动程序说明和先决条件
数据库
为了使用数据库队列驱动,你需要一个数据库表来存储任务。通常,Laravel 的默认迁移文件 0001_01_01_000002_create_jobs_table.php
包含了这个表的创建;如果你的应用程序中没有这个迁移文件,你可以使用 make:queue-table
Artisan 命令来创建它:
php artisan make:queue-table
php artisan migrate
Redis
为了使用 Redis
队列驱动,你需要在 config/database.php
配置文件中配置一个 Redis 数据库连接。
Redis 队列驱动不支持 |
Redis 集群
如果你的 Redis 队列连接使用 Redis 集群,你的队列名称必须包含一个键【哈希标签】。这是必需的,以确保给定队列的所有 Redis 键都被放置在相同的哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
阻塞模式
当使用 Redis 队列时,你可以使用 block_for
配置选项来指定驱动程序应该等待多长时间,直到任务可用,然后才会进入工作循环并重新轮询 Redis 数据库。
根据队列负载调整此值,比起不断轮询 Redis 数据库查找新任务,可能会更加高效。例如,你可以将值设置为 5,表示驱动程序应在等待任务可用时阻塞 5 秒:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
将 |
创建任务
生成任务类
默认情况下,你的应用程序中的所有可排队任务都会存储在 app/Jobs
目录中。如果 app/Jobs
目录不存在,在你运行 make:job
Artisan 命令时会自动创建该目录:
php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,告诉 Laravel 该任务应该被推送到队列中以异步执行。
你可以通过【发布存根】来定制任务的存根文件。 |
类结构
作业类非常简单,通常只包含一个 handle
方法,当作业由队列处理时,该方法会被调用。为了更好地理解,让我们看一个示例作业类。在这个示例中,我们假设我们管理一个播客发布服务,需要在发布之前处理上传的播客文件:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的作业实例。
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* 执行作业。
*/
public function handle(AudioProcessor $processor): void
{
// 处理上传的播客...
}
}
在这个示例中,注意我们能够直接将 【Eloquent 模型】传递给排队作业的构造函数。由于作业使用了 Queueable
特性,Eloquent 模型及其已加载的关系会在处理作业时被优雅地序列化和反序列化。
如果你的排队作业接受 Eloquent 模型作为构造函数的参数,只有模型的标识符会被序列化到队列中。当作业被处理时,队列系统会自动从数据库重新获取完整的模型实例及其已加载的关系。通过这种方式,模型的序列化允许队列载荷更小。
handle
方法的依赖注入
handle
方法是在作业通过队列处理时被调用的。注意,我们能够在作业的 handle
方法上进行类型提示,Laravel 服务容器会自动注入这些依赖项。
如果你希望完全控制容器如何注入依赖项到 handle
方法中,可以使用容器的 bindMethod
方法。bindMethod
方法接受一个回调,该回调接收作业和容器。在回调内,你可以自由地调用 handle
方法。通常,你应该在 App\Providers\AppServiceProvider
服务提供者的 boot
方法中调用此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
像原始图像内容等二进制数据,在传递给排队作业之前应该先通过 |
排队关系
由于所有加载的 Eloquent 模型关系也会在作业排队时被序列化,因此序列化的作业字符串有时可能会变得非常大。此外,当作业被反序列化并且模型关系从数据库中重新获取时,它们会被完全获取。反序列化时,任何之前应用到模型的关系约束将不会再应用。因此,如果你希望只操作给定关系的子集,你应该在排队作业中重新约束该关系。
或者,为了防止关系被序列化,你可以在设置属性值时调用模型的 withoutRelations
方法。这个方法会返回一个不包含已加载关系的模型实例:
/**
* 创建一个新的作业实例。
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}
如果你使用 PHP 的构造函数属性提升,并希望指示一个 Eloquent 模型不应序列化其关系,可以使用 WithoutRelations
属性:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* 创建一个新的作业实例。
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}
如果一个作业接收的是 Eloquent 模型的集合或数组,而不是单个模型,则在作业被反序列化并执行时,集合中的模型将不会恢复其关系。这是为了防止在处理大量模型的作业中占用过多的资源。
唯一任务
唯一的任务需要一个支持【锁】的缓存驱动。目前, |
有时,您可能希望确保在任何时间点,队列中只有一个特定任务的实例。您可以通过在任务类上实现 ShouldBeUnique
接口来实现这一点。此接口不需要您在类中定义任何额外的方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
}
在上面的示例中,UpdateSearchIndex
任务是唯一的。因此,如果队列中已有该任务的实例并且尚未完成处理,则不会调度该任务。
在某些情况下,您可能希望定义一个特定的 “键” 来使任务唯一,或者希望指定一个超时值,超时后任务不再保持唯一。为此,您可以在任务类中定义 uniqueId
和 uniqueFor
属性或方法:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
在上面的示例中,UpdateSearchIndex
任务通过产品 ID 来唯一化。因此,任何带有相同产品 ID 的任务调度都将被忽略,直到现有任务完成处理。此外,如果现有任务未在一小时内完成处理,则唯一锁将被释放,且可以调度一个具有相同唯一键的新任务。
如果您的应用程序从多个 Web 服务器或容器调度任务,您应确保所有服务器都与同一个中央缓存服务器进行通信,这样 Laravel 才能准确地判断任务是否唯一。 |
在处理开始之前保持任务唯一
默认情况下,唯一任务会在任务完成处理或失败所有重试后解锁。然而,有时您希望任务在处理之前立即解锁。为此,您的任务应该实现 ShouldBeUniqueUntilProcessing
合同,而不是 ShouldBeUnique
合同:
<?php
use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
唯一任务锁
在后台,当调度 ShouldBeUnique
任务时,Laravel 会尝试获取带有 uniqueId
键的锁。如果未能获取锁,任务将不会被调度。该锁会在任务完成处理或失败所有重试后释放。默认情况下,Laravel 会使用默认缓存驱动程序来获取此锁。不过,如果您希望使用其他驱动程序来获取锁,可以定义一个 uniqueVia
方法,该方法返回应该用于获取锁的缓存驱动程序:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
如果您只需要限制任务的并发处理,请改用 |
任务中间件
任务中间件允许你在队列任务的执行过程中封装自定义逻辑,从而减少任务本身的样板代码。例如,考虑以下 handle
方法,它利用 Laravel 的 Redis 限流特性,每 5 秒钟只允许一个任务执行:
use Illuminate\Support\Facades\Redis;
/**
* 执行任务。
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('锁定成功...');
// 处理任务...
}, function () {
// 无法获取锁...
return $this->release(5);
});
}
虽然这段代码是有效的,但 handle
方法的实现变得很冗长,因为它充斥着 Redis 限流逻辑。此外,这些限流逻辑需要在其它想要进行限流的任务中重复编写。
与其在 handle
方法中进行限流,我们可以定义一个任务中间件来处理限流逻辑。Laravel 并没有为任务中间件指定默认位置,所以你可以将任务中间件放置在应用程序中的任何位置。在这个例子中,我们将中间件放置在 app/Jobs/Middleware
目录下:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 处理队列任务。
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 锁定成功...
$next($job);
}, function () use ($job) {
// 无法获取锁...
$job->release(5);
});
}
}
正如你所看到的,和【路由中间件】一样,任务中间件接收正在处理的任务和一个回调,回调需要在任务处理完成后继续执行。
创建任务中间件后,你可以通过在任务的 middleware
方法中返回它们,将中间件附加到任务上。需要注意的是,这个 middleware
方法在使用 make:job
Artisan 命令生成的任务中并不存在,所以你需要手动将它添加到任务类中:
use App\Jobs\Middleware\RateLimited;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
任务中间件也可以分配给可排队的事件监听器、邮件和通知。 |
速率限制
虽然我们刚刚演示了如何编写自己的限流任务中间件,Laravel 实际上包含了一个可以用来限流任务的中间件。与路由限流器类似,任务限流器是通过 RateLimiter
门面中的 for
方法来定义的。
例如,你可能希望允许用户每小时备份一次数据,但对付费用户不加限制。为了实现这一点,你可以在 AppServiceProvider
的 boot
方法中定义一个 RateLimiter:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* 启动应用程序服务。
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
在上面的例子中,我们定义了一个每小时的限流;不过,你也可以使用 perMinute
方法定义基于分钟的限流。此外,你可以将任何值传递给限流的 by
方法;不过,这个值通常用来根据客户进行限流分段:
return Limit::perMinute(50)->by($job->user->id);
一旦定义了限流,你可以通过 Illuminate\Queue\Middleware\RateLimited
中间件将限流器附加到任务上。每次任务超过限流时,这个中间件会根据限流的持续时间,将任务释放回队列,并附加适当的延迟。
use Illuminate\Queue\Middleware\RateLimited;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
将一个限流的任务释放回队列时,任务的总尝试次数仍然会增加。你可能需要相应地调整任务类中的 tries
和 maxExceptions
属性。或者,你可以使用 retryUntil
方法来定义任务不再尝试的时间。
如果你不希望任务在限流时被重试,可以使用 dontRelease
方法:
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
如果你使用 Redis,你可以使用 |
防止任务重叠
Laravel 包含了一个 Illuminate\Queue\Middleware\WithoutOverlapping
中间件,允许你根据任意的键来防止任务的重叠。当一个队列任务正在修改某个资源时,这个功能非常有用,特别是当你希望同一时间只有一个任务能够修改该资源时。
例如,假设你有一个队列任务用来更新用户的信用评分,并且你希望防止同一个用户 ID 的信用评分更新任务发生重叠。为此,你可以在任务的 middleware
方法中返回 WithoutOverlapping
中间件:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
任何相同类型的重叠任务都会被释放回队列。你还可以指定在任务被释放后,多少秒后该任务将重新尝试执行:
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
如果你希望立即删除任何重叠任务,以避免它们被重试,可以使用 dontRelease
方法:
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
WithoutOverlapping
中间件是通过 Laravel 的原子锁功能来实现的。有时候,任务可能会意外失败或超时,导致锁未被释放。因此,你可以显式地定义一个锁的过期时间,使用 expireAfter
方法。例如,下面的代码会在任务开始处理后的三分钟内指示 Laravel 释放 WithoutOverlapping
锁:
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
|
在任务类之间共享锁键
默认情况下,WithoutOverlapping
中间件只会防止相同类的任务重叠。因此,即使两个不同的任务类使用相同的锁键,它们也不会被防止重叠。然而,你可以通过 shared
方法指示 Laravel 在不同任务类之间共享锁键:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
通过这种方式,ProviderIsDown
和 ProviderIsUp
两个不同的任务类都将共享相同的锁键,从而避免它们之间的重叠。
限制异常
Laravel 包含了一个 Illuminate\Queue\Middleware\ThrottlesExceptions
中间件,允许你对异常进行限速。一旦任务抛出指定数量的异常,所有进一步的任务执行将被延迟,直到指定的时间间隔过去。这个中间件对于与不稳定的第三方服务交互的任务尤其有用。
例如,假设你有一个与第三方 API 交互的队列任务,该 API 开始抛出异常。为了限速这些异常,你可以在任务的 middleware
方法中返回 ThrottlesExceptions
中间件。通常,这个中间件应与实现【基于时间的尝试次数】的任务一起使用:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* 确定任务的超时时间。
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}
此中间件的第一个构造函数参数是任务可以抛出的异常数量,超过该数量后任务将被限速。第二个构造函数参数是当任务被限速后,任务将在多少秒后再次尝试执行。在上面的示例中,如果任务连续抛出 10 个异常,我们将等待 5 分钟后再尝试执行该任务,受限于 30 分钟的时间限制。
当任务抛出异常,但异常阈值尚未达到时,任务通常会立即重试。但是,你可以通过在将中间件附加到任务时调用 backoff
方法来指定任务重试时的延迟时间:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}
内部,这个中间件使用 Laravel 的缓存系统来实现速率限制,且任务的类名被用作缓存的 “键”。你可以通过在附加中间件时调用 by
方法来覆盖此键。如果你有多个任务与同一个第三方服务交互,并且希望它们共享一个公共的限速 “桶”,那么这个功能会很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}
默认情况下,该中间件将限速所有异常。你可以通过在附加中间件时调用 when
方法来修改此行为。只有当传递给 when
方法的闭包返回 true
时,异常才会被限速:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
如果你希望将限速的异常报告给应用程序的异常处理程序,可以通过在附加中间件时调用 report
方法来实现。你还可以选择为 report
方法提供一个闭包,只有当闭包返回 true
时,异常才会被报告:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* 获取任务通过的中间件。
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
如果你使用 Redis,可以使用 |
跳过任务
Skip
中间件允许你指定任务应该被跳过或删除,而无需修改任务的逻辑。如果给定的条件为 true
,Skip::when
方法将删除该任务,而 Skip::unless
方法则会在条件为 false
时删除任务:
use Illuminate\Queue\Middleware\Skip;
/**
* 获取任务通过的中间件。
*/
public function middleware(): array
{
return [
Skip::when($someCondition),
];
}
你还可以将一个闭包传递给 when
和 unless
方法,以进行更复杂的条件评估:
use Illuminate\Queue\Middleware\Skip;
/**
* 获取任务通过的中间件。
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}
分派任务
一旦你编写了任务类,你可以使用任务本身的 dispatch
方法来调度它。传递给 dispatch
方法的参数将被传递给任务的构造函数:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
如果你想要有条件地调度任务,可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 应用中,sync
驱动程序是默认的队列驱动。这个驱动程序会在当前请求的前台同步执行任务,这在本地开发时通常很方便。如果你希望实际开始将任务排入队列并进行后台处理,你可以在应用的 config/queue.php
配置文件中指定一个不同的队列驱动。
延迟分派
如果你希望指定任务在队列工作者处理之前不能立即可用,可以在调度任务时使用 delay
方法。例如,我们可以指定一个任务在被调度后 10 分钟才开始处理:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
在某些情况下,任务可能有一个默认的延迟时间。如果你需要绕过这个延迟,并立即调度一个任务进行处理,可以使用 withoutDelay
方法:
ProcessPodcast::dispatch($podcast)->withoutDelay();
Amazon SQS 队列服务的最大延迟时间为 15 分钟。 |
在响应发送到浏览器后再调度任务
另外,dispatchAfterResponse
方法会延迟调度任务,直到 HTTP 响应发送到用户的浏览器(如果你的 Web 服务器使用 FastCGI)。即使队列任务仍在执行,用户也可以继续使用应用程序。通常,这应仅用于处理大约 1 秒钟的任务,例如发送电子邮件。因为这些任务是在当前 HTTP 请求中处理的,所以通过这种方式调度的任务不需要队列工作者运行:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
你还可以调度一个闭包,并在 dispatch
辅助方法后链式调用 afterResponse
方法,以便在 HTTP 响应发送到浏览器后执行一个闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();
同步分派
如果你希望立即调度一个任务(同步执行),可以使用 dispatchSync
方法。使用此方法时,任务不会被加入队列,而是会在当前进程中立即执行:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建播客...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
任务与数据库事务
虽然在数据库事务中调度任务是完全可以的,但你需要特别注意确保任务能够成功执行。当在事务中调度任务时,可能会发生任务在父事务提交之前被工作者处理的情况。发生这种情况时,你在数据库事务中对模型或数据库记录所做的更新可能尚未反映到数据库中。此外,在事务中创建的任何模型或数据库记录可能还不存在于数据库中。
幸运的是,Laravel 提供了几种方法来解决这个问题。首先,你可以在队列连接的配置数组中设置 after_commit
选项:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
当 after_commit
选项设置为 true
时,你可以在数据库事务中调度任务;然而,Laravel 会等到父数据库事务提交后才实际调度任务。当然,如果当前没有打开的数据库事务,任务将会立即调度。
如果由于事务中的异常导致事务被回滚,那么在该事务中调度的任务将会被丢弃。
将 |
内联指定提交后调度行为
如果你没有将 after_commit
队列连接配置选项设置为 true
,你仍然可以指示某个特定的任务在所有打开的数据库事务提交后再调度。为此,你可以在调度操作中链式调用 afterCommit
方法:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
同样,如果 after_commit
配置选项设置为 true
,你也可以指示某个特定的任务立即调度,而无需等待任何打开的数据库事务提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();
任务链式执行
任务链允许你指定一系列在主任务成功执行后依次运行的队列任务。如果链中的某个任务失败,后续的任务将不会被执行。要执行队列任务链,可以使用 Bus
facade 提供的 chain
方法。Laravel 的命令总线是一个较低级的组件,队列任务调度是建立在其之上的:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了链接任务类实例外,你还可以链接闭包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
在任务中使用 |
链式任务的连接和队列
如果你希望指定用于链式任务的连接和队列,可以使用 onConnection
和 onQueue
方法。这些方法指定了应使用的队列连接和队列名称,除非显式地为队列任务分配了不同的连接/队列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
向任务链中添加任务
有时你可能需要在任务链中的另一个任务执行时,向现有链中添加任务。可以使用 prependToChain
和 appendToChain
方法来实现:
/**
* 执行任务
*/
public function handle(): void
{
// ...
// 在当前链的前面添加任务,立即在当前任务之后执行...
$this->prependToChain(new TranscribePodcast);
// 在当前链的末尾添加任务,任务将在链末尾执行...
$this->appendToChain(new TranscribePodcast);
}
链式任务失败
在链式任务中,你可以使用 catch
方法指定一个回调,在链中的某个任务失败时调用。给定的回调将接收导致任务失败的 Throwable
实例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// 链中的某个任务失败了...
})->dispatch();
由于链式回调是序列化的,并由 Laravel 队列在稍后的时间执行,因此在链式回调中不应使用 |
自定义队列和连接
分派到特定队列
通过将任务推送到不同的队列,你可以对队列任务进行 “分类”,甚至优先分配给不同队列的工作进程。请注意,这并不会将任务推送到你的队列配置文件中定义的不同队列“连接”,而只是将任务推送到单一连接中的特定队列。要指定队列,可以在调度任务时使用 onQueue
方法:
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建 podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
另外,你也可以通过在任务的构造函数中调用 onQueue
方法来指定任务的队列:
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建新的任务实例。
*/
public function __construct()
{
$this->onQueue('processing');
}
}
分派到特定连接
如果你的应用程序与多个队列连接进行交互,可以使用 onConnection
方法来指定将任务推送到哪个连接:
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// 创建 podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
你还可以将 onConnection
和 onQueue
方法连用,来指定任务的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
另外,你也可以通过在任务的构造函数中调用 onConnection
方法来指定任务的连接:
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建新的任务实例。
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
指定最大任务尝试次数 / 超时值
最大重试次数
如果你的队列任务遇到错误,你可能不希望它无限制地重试。因此,Laravel 提供了多种方法来指定一个任务可以尝试的次数或尝试的时间。
一种指定任务最大尝试次数的方法是通过 Artisan 命令行的 --tries
参数。这将适用于所有由工作进程处理的任务,除非任务本身指定了最大尝试次数:
php artisan queue:work --tries=3
如果任务超过最大尝试次数,它将被视为 “失败的任务”。有关处理失败任务的更多信息,请参考 失败任务文档。如果 --tries=0
被提供给 queue:work
命令,任务将会无限重试。
你可以在任务类中定义最大尝试次数。如果在任务类中指定了最大尝试次数,它将优先于命令行中提供的 --tries
参数:
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可以尝试的最大次数
*
* @var int
*/
public $tries = 5;
}
如果你需要对某个特定任务的最大尝试次数进行动态控制,可以在任务类中定义 tries
方法:
/**
* 确定任务可以尝试的最大次数
*/
public function tries(): int
{
return 5;
}
基于时间的重试
除了定义任务在失败之前可以尝试的最大次数外,你还可以定义任务在何时不再尝试,这允许任务在给定的时间范围内尝试任意次数。要定义任务的超时时间,可以在任务类中添加 retryUntil
方法,该方法应该返回一个 DateTime
实例:
use DateTime;
/**
* 确定任务的超时时间
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
你还可以在【排队的事件监听器】中定义 |
最大异常次数
有时你可能希望指定任务可以多次尝试,但如果重试是由于特定数量的未处理异常触发的(而不是通过 release
方法直接释放任务),则任务应该失败。为此,你可以在任务类中定义一个 maxExceptions
属性:
namespace App\Jobs;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务可以尝试的最大次数
*
* @var int
*/
public $tries = 25;
/**
* 允许的最大未处理异常次数,超过此次数则任务失败
*
* @var int
*/
public $maxExceptions = 3;
/**
* 执行任务
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 获取锁,处理 podcast...
}, function () {
// 无法获取锁,释放任务...
return $this->release(10);
});
}
}
在这个例子中,如果应用程序无法获取 Redis 锁,任务会被释放 10 秒钟并重试最多 25 次。然而,如果任务抛出 3 次未处理异常,任务会失败。
超时
通常,你大概知道你的队列任务需要多长时间。因此,Laravel 允许你指定一个“超时”值。默认情况下,超时时间是 60 秒。如果任务的处理时间超过了指定的超时时间,处理任务的工作进程将退出并报错。通常,工作进程会被服务器上配置的进程管理器自动重启。
你可以使用 --timeout
参数在 Artisan 命令行中指定任务的最大运行时间(单位:秒):
php artisan queue:work --timeout=30
如果任务因为超时而超过最大尝试次数,它将被标记为失败。
你还可以在任务类中定义任务的最大运行时间。如果任务类中指定了超时,它将优先于命令行中指定的超时时间:
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 任务允许运行的最大时间(秒)
*
* @var int
*/
public $timeout = 120;
}
有时,像套接字或外部 HTTP 请求这样的 IO 阻塞过程可能不遵守你指定的超时。因此,在使用这些特性时,你应该总是尝试通过它们的 API 来指定超时。例如,当使用 Guzzle 时,你应该始终指定连接和请求的超时值。
必须安装 |
错误处理
手动释放任务
有时,你可能希望手动将任务释放回队列,以便稍后再进行尝试。你可以通过调用 release
方法来实现:
/**
* 执行任务。
*/
public function handle(): void
{
// ...
$this->release();
}
默认情况下,release
方法会立即将任务释放回队列,供下一个工作进程处理。然而,你也可以通过向 release
方法传递一个整数或日期实例,指示队列在一定时间后再使任务可用:
$this->release(10); // 等待10秒后重试任务
$this->release(now()->addSeconds(10)); // 等待指定的时间后重试任务
手动标记任务为失败
有时你可能需要手动将任务标记为 “失败”。你可以通过调用 fail
方法来实现:
/**
* 执行任务。
*/
public function handle(): void
{
// ...
$this->fail();
}
如果你想标记任务为失败,并且是因为捕获到的异常导致的,你可以将异常传递给 fail
方法。或者,为了方便,你也可以传递一个字符串错误信息,Laravel 会将其转换为异常:
$this->fail($exception); // 传递异常
$this->fail('Something went wrong.'); // 传递错误信息
有关失败任务的更多信息,请查看 处理失败任务的文档。 |
任务批处理
Laravel 的任务批处理功能允许你轻松地执行一批任务,并在任务批次完成执行后执行某些操作。在开始之前,你应该创建一个数据库迁移来构建一个表,该表将包含有关任务批次的元信息,例如其完成百分比。你可以使用 make:queue-batches-table
Artisan 命令生成该迁移:
php artisan make:queue-batches-table
php artisan migrate
定义可批处理任务
要定义一个可批处理的任务,你应该像正常一样创建一个队列任务;但是,你需要在任务类中添加 Illuminate\Bus\Batchable
特性。这个特性提供了对 batch
方法的访问,可以用来检索当前任务正在执行的批次:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// 判断批次是否已取消...
return;
}
// 导入 CSV 文件的一部分...
}
}
分派批处理任务
要调度一批任务,你应该使用 Bus
门面的 batch
方法。当然,批量处理最常用于结合完成回调一起使用。因此,你可以使用 then
、catch
和 finally
方法来定义批次的完成回调。每个回调在调用时都会接收到一个 Illuminate\Bus\Batch
实例。以下是一个示例,假设我们正在调度一批处理 CSV 文件中指定行数的任务:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// 批次已创建,但尚未添加任务...
})->progress(function (Batch $batch) {
// 一个任务已成功完成...
})->then(function (Batch $batch) {
// 所有任务都成功完成...
})->catch(function (Batch $batch, Throwable $e) {
// 检测到第一个任务失败...
})->finally(function (Batch $batch) {
// 批次执行完成...
})->dispatch();
return $batch->id;
批次的 ID 可以通过 $batch->id
属性访问,在任务调度后可以用来查询 Laravel 命令总线中关于该批次的信息。
由于批量回调是序列化的,并且在稍后由 Laravel 队列执行,因此在回调中不应使用 |
链式与批处理
你可以通过将链式任务放在数组中,在批次中定义一组链式任务。例如,我们可以并行执行两组任务链,并在两组任务链都完成后执行回调:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// ...
})->dispatch();
反过来,你也可以在链式任务中运行批量任务,通过在【链】中定义批次。例如,你可以首先运行一批任务来发布多个播客,然后运行另一批任务来发送发布通知:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
向批处理中添加任务
有时,从批量任务内部向批次添加额外的任务是很有用的。这种模式在需要批量处理成千上万个任务时尤其有用,因为这些任务可能会在网页请求期间耗费太多时间。因此,你可能希望先调度一批 “加载” 任务,这些任务会将更多的任务添加到批次中:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// 所有任务成功完成...
})->name('Import Contacts')->dispatch();
在这个例子中,我们使用 LoadImportBatch
任务来向批次中添加更多的任务。为此,我们可以使用批次实例上的 add
方法,该方法可以通过任务的 batch
方法访问:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
// 向批次添加 1000 个任务
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
你只能从属于同一批次的任务中向批次添加任务。 |
检查批处理
Illuminate\Bus\Batch
实例提供了多种属性和方法,帮助你与批次中的任务进行交互和检查批次的状态:
// 批次的 UUID...
$batch->id;
// 批次的名称(如果有的话)...
$batch->name;
// 分配给批次的任务数量...
$batch->totalJobs;
// 尚未被队列处理的任务数量...
$batch->pendingJobs;
// 失败的任务数量...
$batch->failedJobs;
// 到目前为止已经处理的任务数量...
$batch->processedJobs();
// 批次的完成百分比(0-100)...
$batch->progress();
// 表示批次是否已完成执行...
$batch->finished();
// 取消批次的执行...
$batch->cancel();
// 表示批次是否已被取消...
$batch->cancelled();
从路由返回批次
所有 Illuminate\Bus\Batch
实例都是可 JSON 序列化的,这意味着你可以直接从应用的某个路由返回它们,以获取一个包含批次信息的 JSON 响应,其中包括批次的完成进度。这使得在应用的 UI 中显示批次的完成进度变得方便。
要通过批次的 ID 获取批次,你可以使用 Bus
facade 的 findBatch
方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批处理
有时你可能需要取消某个批次的执行。你可以通过调用 Illuminate\Bus\Batch
实例的 cancel
方法来实现这一点:
/**
* 执行任务。
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
return $this->batch()->cancel();
}
if ($this->batch()->cancelled()) {
return;
}
}
如你在之前的示例中所注意到的,批量任务通常应该在继续执行之前检查它们对应的批次是否已被取消。然而,为了方便,你可以为任务指定 SkipIfBatchCancelled
【中间件】。正如其名字所示,这个中间件会指示 Laravel 如果对应的批次已被取消,则不处理该任务:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* 获取任务应通过的中间件。
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
批处理失败
当批量任务失败时,如果已定义 catch
回调,该回调将被触发。此回调仅在批量中的第一个任务失败时被调用。
清理批处理
没有清理功能的话,job_batches
表可能会快速积累记录。为了减少这种情况,应该每天调度运行 queue:prune-batches
Artisan 命令:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
默认情况下,所有完成超过 24 小时的批次将会被清理。你可以使用 --hours
选项来决定保留批次数据的时间。例如,以下命令会删除所有完成超过 48 小时的批次:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
有时,job_batches
表可能会积累未完成的批次记录,例如任务失败且该任务从未成功重试的批次。你可以使用 unfinished
选项来指示 queue:prune-batches
命令清理这些未完成的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同样,job_batches
表也可能会积累已取消的批次记录。你可以使用 --cancelled
选项来指示 queue:prune-batches
命令清理这些已取消的批次记录:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
将批处理存储在 DynamoDB 中
Laravel 还支持将批次的元数据存储在 DynamoDB 中,而不是关系型数据库。为了实现这一点,您需要手动创建一个 DynamoDB 表来存储所有的批次记录。
通常,表名应为 job_batches
,但是您可以根据应用程序队列配置文件中的 queue.batching.table
配置值来命名该表。
DynamoDB 批次表配置
job_batches
表应具有一个名为 application
的字符串主分区键和一个名为 id
的字符串主排序键。键中的 application
部分将包含应用程序的名称,该名称由应用程序的 app
配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用相同的表来存储多个 Laravel 应用程序的作业批次。
此外,如果您希望利用自动批次修剪功能,可以为表定义 ttl
属性。
DynamoDB 配置
接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 进行通信:
composer require aws/aws-sdk-php
然后,将 queue.batching.driver
配置选项的值设置为 dynamodb
。此外,您还应在批次配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb
驱动程序时,queue.batching.database
配置选项不再需要:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
在 DynamoDB 中修剪批次
当使用 【DynamoDB】 存储作业批次信息时,通常用于修剪关系型数据库中的批次的命令将不起作用。相反,您可以利用 【DynamoDB 的原生 TTL(过期时间)功能】来自动删除旧批次记录。
如果您为 DynamoDB 表定义了 ttl
属性,则可以定义配置参数来指导 Laravel 如何修剪批次记录。queue.batching.ttl_attribute
配置值定义了保存 TTL 的属性名称,而 queue.batching.ttl
配置值定义了从记录最后一次更新时间起,可以将批次记录从 DynamoDB 表中删除的秒数:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 天...
],
队列闭包
除了将作业类调度到队列中,您还可以调度一个闭包。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用。当将闭包调度到队列时,闭包的代码内容会被加密签名,以确保它在传输过程中不会被修改:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
通过 catch
方法,您可以提供一个闭包,如果队列中的闭包在尝试所有配置的重试次数后仍未成功完成,则该闭包会被执行:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// 该作业失败了...
});
由于 |
运行队列工作进程
queue:work
命令
Laravel 包含一个 Artisan 命令,可以启动队列工作程序,并在将新的任务推送到队列时处理它们。你可以使用 queue:work
Artisan 命令来运行该工作程序。请注意,一旦启动 queue:work
命令,它将持续运行,直到手动停止或关闭终端:
php artisan queue:work
为了让 |
你还可以在调用 queue:work
命令时使用 -v
标志,以便在命令输出中包含已处理任务的 ID:
php artisan queue:work -v
请记住,队列工作程序是长时间运行的进程,并会将启动的应用状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。所以,在部署过程中,请确保重启队列工作程序。此外,应用中创建或修改的任何静态状态在任务之间不会自动重置。
或者,你可以运行 queue:listen
命令。当使用 queue:listen
命令时,你不必手动重启工作程序来重新加载更新的代码或重置应用状态;然而,这个命令比 queue:work
命令效率低得多:
php artisan queue:listen
运行多个队列工作程序
要分配多个工作程序来处理队列并并发处理任务,你只需启动多个 queue:work
进程。你可以通过多个终端标签在本地进行操作,或者在生产环境中通过进程管理器的配置设置来完成。使用 Supervisor 时,你可以使用 numprocs
配置值来启动多个进程。
指定连接和队列
你还可以指定工作程序应使用的队列连接。传递给 queue:work
命令的连接名称应该与 config/queue.php
配置文件中定义的连接之一相对应:
php artisan queue:work redis
默认情况下,queue:work
命令只处理给定连接的默认队列。然而,你可以进一步自定义队列工作程序,只处理给定连接的特定队列。例如,如果你所有的电子邮件都在 Redis
队列连接中的 emails
队列中处理,你可以使用以下命令启动一个仅处理该队列的工作程序:
php artisan queue:work redis --queue=emails
处理指定数量的任务
--once
选项可用于指示工作程序只处理队列中的一个任务:
php artisan queue:work --once
--max-jobs
选项可用于指示工作程序处理给定数量的任务,然后退出。这个选项在与 Supervisor 配合使用时非常有用,能够在处理一定数量的任务后自动重启工作程序,释放它们可能累积的内存:
php artisan queue:work --max-jobs=1000
处理所有队列中的任务然后退出
--stop-when-empty
选项可用于指示工作程序处理完所有任务后优雅地退出。这个选项在 Docker 容器中处理 Laravel 队列时非常有用,如果你希望在队列为空时关闭容器:
php artisan queue:work --stop-when-empty
处理任务指定的秒数
--max-time
选项可用于指示工作程序处理任务指定的秒数,然后退出。这个选项与 【Supervisor】 配合使用时也很有用,能够在处理一定时间的任务后自动重启工作程序,释放它们可能累积的内存:
# 处理一个小时的任务,然后退出...
php artisan queue:work --max-time=3600
工作程序睡眠时长
当队列中有任务时,工作程序将不断处理任务,任务之间没有延迟。然而,sleep
选项决定了如果没有任务可用时,工作程序将 “睡眠” 多少秒。自然,在睡眠期间,工作程序不会处理新任务:
php artisan queue:work --sleep=3
队列优先级
有时候,你可能希望优先处理队列中的任务。例如,在 config/queue.php
配置文件中,你可以将 Redis 连接的默认队列设置为低优先级。然而,偶尔你可能希望将一个任务推送到高优先级的队列,像这样:
dispatch((new Job)->onQueue('high'));
为了启动一个工作程序,确保所有高优先级队列的任务在继续处理低优先级队列的任务之前被处理,可以在 work
命令中传递一个由逗号分隔的队列名称列表:
php artisan queue:work --queue=high,low
队列工作进程与部署
由于队列工作程序是长期运行的进程,它们在没有重启的情况下不会注意到代码的更改。因此,部署使用队列工作程序的应用程序的最简单方法是在部署过程中重启工作程序。你可以通过执行 queue:restart
命令来优雅地重启所有工作程序:
php artisan queue:restart
该命令会指示所有队列工作程序在完成当前任务后优雅地退出,以确保不会丢失现有的任务。由于队列工作程序会在执行 queue:restart
命令时退出,因此你应该使用进程管理工具(如 【Supervisor】)来自动重启队列工作程序。
队列使用【缓存】来存储重启信号,因此在使用此功能之前,你应该确认你的应用程序已正确配置了缓存驱动。 |
任务过期与超时
任务过期
在 config/queue.php
配置文件中,每个队列连接都定义了一个 retry_after
选项。该选项指定了队列连接在重新尝试一个正在处理的任务之前应等待的秒数。例如,如果 retry_after
的值设置为 90,当任务已经处理了 90 秒而没有被释放或删除时,任务将被重新放回队列。通常,你应该将 retry_after
的值设置为任务合理完成处理所需的最大秒数。
唯一不包含 |
工作程序超时
queue:work
Artisan 命令提供了一个 --timeout
选项。默认情况下,--timeout
的值为 60 秒。如果一个任务的处理时间超过了 timeout
值指定的秒数,处理该任务的工作程序将以错误状态退出。通常,工作程序会被配置在服务器上的进程管理器自动重启:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们共同工作,确保任务不会丢失,并且任务只会成功处理一次。
|
监控配置
在生产环境中,你需要一种方法来保持 queue:work
进程的持续运行。queue:work
进程可能因各种原因停止运行,例如超出了工作程序的超时时间或执行了 queue:restart
命令。
因此,你需要配置一个进程监控工具,能够检测到 queue:work
进程退出并自动重新启动它们。此外,进程监控工具还可以让你指定希望同时运行多少个 queue:work
进程。Supervisor 是一个常见的 Linux 环境下的进程监控工具,接下来的文档将讨论如何配置它。
安装 Supervisor
Supervisor 是一个适用于 Linux 操作系统的进程监控工具,能够在 queue:work
进程失败时自动重启它们。要在 Ubuntu 上安装 Supervisor,你可以使用以下命令:
sudo apt-get install supervisor
如果自己配置和管理 Supervisor 感到困难,可以考虑使用 【Laravel Forge】,它会为你的生产环境 Laravel 项目自动安装和配置 Supervisor。 |
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在这个目录下,你可以创建任意数量的配置文件,指示 Supervisor 如何监控你的进程。例如,我们可以创建一个 laravel-worker.conf
文件,来启动并监控 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在这个例子中,numprocs
指令将指示 Supervisor 启动 8 个 queue:work
进程,并监控它们,如果进程失败,会自动重启它们。你应该根据你的队列连接和工作程序选项修改 command
指令。
你需要确保 |
处理失败任务
有时,你的队列任务会失败。别担心,事情并不总是按计划进行!Laravel 提供了一个方便的方式来指定任务应该被尝试的最大次数。一个异步任务超过这个最大尝试次数后,它将被插入到 failed_jobs
数据库表中。同步分派的任务如果失败,异常会立即由应用程序处理,而不会存储在该表中。
通常,新的 Laravel 应用程序中会已经包含一个用于创建 failed_jobs
表的迁移文件。然而,如果你的应用没有包含该表的迁移,你可以使用 make:queue-failed-table
命令来创建迁移:
php artisan make:queue-failed-table
php artisan migrate
在运行队列工作程序进程时,你可以使用 --tries
选项指定任务最大重试次数。如果没有为 --tries
选项指定值,任务将只会尝试一次,或者根据任务类的 $tries
属性进行重试:
php artisan queue:work redis --tries=3
使用 --backoff
选项,你可以指定 Laravel 在任务遇到异常时应该等待多少秒再重试。默认情况下,任务会立即被释放回队列,等待再次尝试:
php artisan queue:work redis --tries=3 --backoff=3
如果你想要为每个任务单独配置重试延迟,你可以在任务类中定义一个 backoff
属性:
/**
* 任务重试前等待的秒数。
*
* @var int
*/
public $backoff = 3;
如果你需要更复杂的逻辑来确定任务的重试延迟,可以在任务类中定义一个 backoff
方法:
/**
* 计算任务重试前等待的秒数。
*/
public function backoff(): int
{
return 3;
}
你可以通过返回一个包含重试延迟值的数组来轻松配置 “指数” 重试延迟。在以下示例中,第一次重试的延迟为 1 秒,第二次重试为 5 秒,第三次重试为 10 秒,之后的所有重试也都为 10 秒:
/**
* 计算任务重试前等待的秒数。
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
清理失败任务
当某个任务失败时,你可能希望向用户发送通知,或者撤销任务可能已部分完成的操作。为此,你可以在任务类中定义一个 failed
方法。导致任务失败的 Throwable
实例会被传递给 failed
方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* 创建一个新的任务实例。
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* 执行任务。
*/
public function handle(AudioProcessor $processor): void
{
// 处理上传的播客...
}
/**
* 处理任务失败。
*/
public function failed(?Throwable $exception): void
{
// 向用户发送失败通知等...
}
}
在调用 |
重试失败任务
要查看所有已插入 failed_jobs
数据库表中的失败任务,你可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令将列出任务的 ID、连接、队列、失败时间以及其它有关任务的信息。任务 ID 可用于重新尝试失败的任务。例如,要重试一个 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
的失败任务,可以使用以下命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如果需要,你可以传递多个任务 ID 给命令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
你也可以选择只重试特定队列的所有失败任务:
php artisan queue:retry --queue=name
要重试所有失败的任务,可以执行 queue:retry
命令并传递 all
作为 ID:
php artisan queue:retry all
如果你想删除一个失败的任务,可以使用 queue:forget
命令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
当使用 Horizon 时,应该使用 |
要从 failed_jobs
表中删除所有失败的任务,可以使用 queue:flush
命令:
php artisan queue:flush
忽略缺失模型
当将 Eloquent 模型注入到一个任务中时,模型会在放入队列之前自动进行序列化,并在任务处理时从数据库重新检索。然而,如果在任务等待被工作进程处理期间,模型被删除,任务可能会因 ModelNotFoundException
异常而失败。
为了方便起见,你可以选择在模型缺失时自动删除任务,方法是将任务的 deleteWhenMissingModels
属性设置为 true
。当此属性设置为 true
时,Laravel 会在没有抛出异常的情况下悄悄丢弃该任务:
/**
* 如果任务的模型不存在,则删除该任务。
*
* @var bool
*/
public $deleteWhenMissingModels = true;
清理失败任务
你可以通过执行 queue:prune-failed
Artisan 命令来清理应用程序中的 failed_jobs
表记录:
php artisan queue:prune-failed
默认情况下,所有超过 24 小时的失败任务记录将被清理。如果你提供 --hours
选项,只有在过去 N 小时内插入的失败任务记录会被保留。例如,以下命令将删除所有插入时间超过 48 小时的失败任务记录:
php artisan queue:prune-failed --hours=48
将失败任务存储在 DynamoDB 中
Laravel 还支持将失败的任务记录存储在 DynamoDB 中,而不是关系型数据库表中。然而,你需要手动创建一个 DynamoDB 表来存储所有失败的任务记录。通常,这个表应该命名为 failed_jobs
,但你应该根据应用程序的队列配置文件中的 queue.failed.table
配置值来命名该表。
failed_jobs
表应该有一个名为 application
的字符串类型主分区键和一个名为 uuid
的字符串类型主排序键。application
键部分将包含你的应用程序名称,该名称由应用程序配置文件中的 name
配置值定义。由于应用程序名称是 DynamoDB 表的键的一部分,你可以使用同一个表来存储多个 Laravel 应用程序的失败任务。
此外,确保你安装了 AWS SDK,以便你的 Laravel 应用程序能够与 Amazon DynamoDB 进行通信:
composer require aws/aws-sdk-php
接下来,将 queue.failed.driver
配置选项的值设置为 dynamodb
。此外,你应该在失败任务配置数组中定义 key
、secret
和 region
配置选项。这些选项将用于与 AWS 进行身份验证。当使用 dynamodb
驱动时,queue.failed.database
配置选项将不再需要:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
禁用失败任务存储
你可以通过将 queue.failed.driver
配置选项的值设置为 null
,指示 Laravel 丢弃失败的任务,而不将其存储。通常,这可以通过设置 QUEUE_FAILED_DRIVER
环境变量来实现:
QUEUE_FAILED_DRIVER=null
失败任务事件
如果你希望注册一个事件监听器,当任务失败时会被触发,可以使用 Queue
门面的 failing
方法。例如,我们可以在 Laravel 提供的 AppServiceProvider
的 boot
方法中附加一个闭包来处理这个事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册应用服务。
*/
public function register(): void
{
// ...
}
/**
* 启动应用服务。
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
清除队列中的任务
在使用 Horizon 时,应该使用 |
如果你希望删除默认连接的默认队列中的所有任务,可以使用 queue:clear
Artisan 命令:
php artisan queue:clear
你还可以提供 connection
参数和 queue
选项,以便从特定的连接和队列中删除任务:
php artisan queue:clear redis --queue=emails
清除队列中的任务仅适用于 SQS、Redis 和数据库队列驱动。此外,SQS 消息删除过程可能需要最多 60 秒,因此在你清空队列后最多 60 秒内发送到 SQS 队列的任务也可能被删除。 |
监控队列
如果你的队列接收到大量任务,可能会导致队列超载,从而导致任务处理的等待时间过长。如果需要,Laravel 可以在队列中的任务数量超过指定阈值时提醒你。
首先,你应该将 queue:monitor
命令安排每分钟运行一次。该命令接受你希望监控的队列名称以及你希望设置的任务数量阈值:
php artisan queue:monitor redis:default,redis:deployments --max=100
仅仅安排此命令并不足以触发当队列任务数量超过阈值时的通知。命令在遇到任务数量超过阈值的队列时,会触发一个 Illuminate\Queue\Events\QueueBusy
事件。你可以在应用的 AppServiceProvider
中监听此事件,以便向你或你的开发团队发送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* 启动任何应用服务。
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
测试
在测试调度任务的代码时,你可能希望指示 Laravel 不实际执行任务,因为任务的代码可以独立于调度它的代码进行测试。自然地,要测试任务本身,你可以直接实例化任务类并在测试中调用 handle
方法。
你可以使用 Queue
facade 的 fake
方法来防止任务被真正推送到队列中。在调用 Queue::fake
方法之后,你可以断言应用是否尝试将任务推送到队列:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a Closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}
你可以向 assertPushed
或 assertNotPushed
方法传递一个闭包,来断言某个任务是否被推送,并且该任务符合给定的“真值测试”。如果至少有一个任务通过了该测试,断言将会成功:
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
伪造部分任务
如果你只需要模拟特定的任务,同时允许其它任务正常执行,你可以将需要模拟的任务类名传递给 fake
方法:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
你也可以使用 except
方法模拟所有任务,除了指定的任务集合:
Queue::fake()->except([
ShipOrder::class,
]);
测试任务链
要测试任务链,你需要利用 Bus
facade 的模拟功能。Bus
facade 的 assertChained
方法可以用来断言任务链是否被分发。assertChained
方法接受一个任务链数组作为第一个参数:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
如上例所示,任务链的数组可以是任务类的类名数组。你也可以提供一个实际的任务实例数组。在这种情况下,Laravel 会确保任务实例属于相同的类,并且拥有与应用程序中分发的任务链相同的属性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
你可以使用 assertDispatchedWithoutChain
方法断言某个任务在没有任务链的情况下被推送:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
测试任务链的修改
如果某个任务在任务链前面或后面添加了其它任务,你可以使用任务的 assertHasChain
方法来断言任务是否具有预期的剩余任务链:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
assertDoesntHaveChain
方法可以用来断言任务的剩余链为空:
$job->assertDoesntHaveChain();
测试链中的批量任务
如果你的任务链包含一个任务批量,你可以通过在链断言中插入 Bus::chainedBatch
来断言链中的批量任务符合预期:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
测试任务批处理
Bus
facade 的 assertBatched
方法可用于断言是否分发了一个批量任务。传递给 assertBatched
方法的闭包会接收一个 Illuminate\Bus\PendingBatch
实例,可以用来检查批量任务中的任务:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
你可以使用 assertBatchCount
方法断言是否分发了给定数量的批量任务:
Bus::assertBatchCount(3);
你还可以使用 assertNothingBatched
方法断言没有分发任何批量任务:
Bus::assertNothingBatched();
测试任务 / 队列交互
有时,你可能需要测试一个排队任务是否将自己重新放回队列中,或者你可能需要测试任务是否删除了自己。你可以通过实例化任务并调用 withFakeQueueInteractions
方法来测试这些队列交互。
一旦任务的队列交互被伪造,你可以调用任务的 handle
方法。调用任务后,可以使用 assertReleased
、assertDeleted
、assertNotDeleted
、assertFailed
和 assertNotFailed
方法对任务的队列交互进行断言:
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertNotFailed();
任务事件
Bus
facade 的 assertBatched
方法可以用来断言一批任务是否被分发。传递给 assertBatched
方法的闭包接收一个 Illuminate\Bus\PendingBatch
实例,你可以使用它来检查批量任务中的任务:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'import-csv' &&
$batch->jobs->count() === 10;
});
你可以使用 assertBatchCount
方法来断言是否分发了指定数量的批量任务:
Bus::assertBatchCount(3);
你也可以使用 assertNothingBatched
方法来断言没有分发任何批量任务:
Bus::assertNothingBatched();