Skip to content

队列

介绍

在构建 Web 应用程序时,您可能会遇到一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。幸运的是,Laravel 允许您轻松创建可以在后台处理的队列作业。通过将耗时的任务移到队列中,您的应用程序可以以极快的速度响应 Web 请求,并为客户提供更好的用户体验。

Laravel 队列提供了跨各种不同队列后端的统一队列 API,例如 Amazon SQSRedis 或甚至是关系数据库。

Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd 驱动程序,以及一个同步驱动程序,该驱动程序将在本地开发期间立即执行作业。还包括一个 null 队列驱动程序,该驱动程序会丢弃队列作业。

lightbulb

Laravel 现在提供了 Horizon,这是一个用于 Redis 驱动队列的美丽仪表板和配置系统。有关更多信息,请查看完整的 Horizon 文档

连接与队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别非常重要。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了与后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。但是,任何给定的队列连接可能有多个“队列”,可以被视为不同的堆栈或队列作业的堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue 属性中定义的队列中:

php
use App\Jobs\ProcessPodcast;

// 此作业被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 此作业被发送到默认连接的“emails”队列...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您指定它应按优先级处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个工作者,给予它们更高的处理优先级:

shell
php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

要使用 database 队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table Artisan 命令。创建迁移后,您可以使用 migrate 命令迁移数据库:

shell
php artisan queue:table

php artisan migrate

最后,不要忘记通过更新应用程序的 .env 文件中的 QUEUE_CONNECTION 变量来指示应用程序使用 database 驱动程序:

php
QUEUE_CONNECTION=database

Redis

要使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置一个 Redis 数据库连接。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含 键哈希标签。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

阻塞

使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在进入工作者循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据您的队列负载调整此值可能比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5,以指示驱动程序在等待作业可用时应阻塞五秒钟:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],
exclamation

block_for 设置为 0 将导致队列工作者无限期阻塞,直到作业可用。这也将阻止信号(如 SIGTERM)在处理下一个作业之前被处理。

其他驱动程序先决条件

以下依赖项是列出的队列驱动程序所需的。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 或 phpredis PHP 扩展

创建作业

生成作业类

默认情况下,应用程序的所有可队列作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时将创建它:

shell
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 该作业应被推送到队列以异步运行。

lightbulb

作业存根可以使用 存根发布 进行自定义。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时被调用。首先,让我们看一个示例作业类。在此示例中,我们假装管理一个播客发布服务,并需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 播客实例。
     *
     * @var \App\Models\Podcast
     */
    public $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  App\Models\Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  App\Services\AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到队列作业的构造函数中。由于作业使用的 SerializesModels 特性,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。

如果您的队列作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符将被序列化到队列中。当作业实际被处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业负载发送到您的队列驱动程序。

handle 方法依赖注入

handle 方法在作业被队列处理时被调用。请注意,我们能够在作业的 handle 方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,您可以随意调用 handle 方法。通常,您应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function ($job, $app) {
    return $job->handle($app->make(AudioProcessor::class));
});
exclamation

二进制数据(如原始图像内容)应在传递给队列作业之前通过 base64_encode 函数进行处理。否则,作业在放置到队列时可能无法正确序列化为 JSON。

队列关系

由于加载的关系也会被序列化,因此序列化的作业字符串有时可能会变得相当大。为了防止关系被序列化,您可以在设置属性值时调用模型的 withoutRelations 方法。此方法将返回一个没有加载关系的模型实例:

php
/**
 * 创建一个新的作业实例。
 *
 * @param  \App\Models\Podcast  $podcast
 * @return void
 */
public function __construct(Podcast $podcast)
{
    $this->podcast = $podcast->withoutRelations();
}

此外,当作业被反序列化并且模型关系从数据库中重新检索时,它们将被完整检索。任何在作业排队过程中序列化之前应用的关系约束在作业反序列化时将不再适用。因此,如果您希望处理给定关系的子集,您应该在队列作业中重新约束该关系。

唯一作业

exclamation

唯一作业需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。此外,唯一作业约束不适用于批处理中的作业。

有时,您可能希望确保在任何时候队列中只有一个特定作业的实例。您可以通过在作业类上实现 ShouldBeUnique 接口来实现。这一接口不需要您在类上定义任何其他方法:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...
}

在上面的示例中,UpdateSearchIndex 作业是唯一的。因此,如果队列中已经有另一个实例的作业并且尚未完成处理,则不会调度该作业。

在某些情况下,您可能希望定义一个特定的“键”来使作业唯一,或者您可能希望指定一个超时,以便作业不再保持唯一性。为此,您可以在作业类上定义 uniqueIduniqueFor 属性或方法:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例。
     *
     * @var \App\Product
     */
    public $product;

    /**
     * 作业唯一锁将被释放的秒数。
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * 作业的唯一 ID。
     *
     * @return string
     */
    public function uniqueId()
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 作业通过产品 ID 唯一。因此,具有相同产品 ID 的作业的新调度将被忽略,直到现有作业完成处理。此外,如果现有作业在一小时内未处理,唯一锁将被释放,并且可以将另一个具有相同唯一键的作业调度到队列。

exclamation

如果您的应用程序从多个 Web 服务器或容器调度作业,您应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定作业是否唯一。

保持作业唯一直到处理开始

默认情况下,唯一作业在作业完成处理或失败所有重试尝试后被“解锁”。但是,在某些情况下,您可能希望在作业处理之前立即解锁作业。为此,您的作业应实现 ShouldBeUniqueUntilProcessing 合同,而不是 ShouldBeUnique 合同:

php
<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一作业锁

在后台,当调度 ShouldBeUnique 作业时,Laravel 尝试获取具有 uniqueId 键的 。如果未获取锁,则不会调度作业。此锁在作业完成处理或失败所有重试尝试时被释放。默认情况下,Laravel 将使用默认缓存驱动程序获取此锁。但是,如果您希望使用其他驱动程序获取锁,可以定义一个 uniqueVia 方法,该方法返回应使用的缓存驱动程序:

php
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    ...

    /**
     * 获取唯一作业锁的缓存驱动程序。
     *
     * @return \Illuminate\Contracts\Cache\Repository
     */
    public function uniqueVia()
    {
        return Cache::driver('redis');
    }
}
lightbulb

如果您只需要限制作业的并发处理,请使用 WithoutOverlapping 作业中间件。

作业中间件

作业中间件允许您在队列作业的执行过程中包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle 方法,该方法利用 Laravel 的 Redis 速率限制功能,每五秒钟只允许一个作业处理一次:

php
use Illuminate\Support\Facades\Redis;

/**
 * 执行作业。
 *
 * @return void
 */
public function handle()
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('获取到锁...');

        // 处理作业...
    }, function () {
        // 无法获取锁...

        return $this->release(5);
    });
}

虽然此代码是有效的,但 handle 方法的实现变得嘈杂,因为它被 Redis 速率限制逻辑所混杂。此外,必须为我们希望速率限制的任何其他作业复制此速率限制逻辑。

我们可以定义一个处理速率限制的作业中间件,而不是在 handle 方法中进行速率限制。Laravel 没有作业中间件的默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放置在 app/Jobs/Middleware 目录中:

php
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理队列作业。
     *
     * @param  mixed  $job
     * @param  callable  $next
     * @return mixed
     */
    public function handle($job, $next)
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // 获取到锁...

                    $next($job);
                }, function () use ($job) {
                    // 无法获取锁...

                    $job->release(5);
                });
    }
}

如您所见,像 路由中间件 一样,作业中间件接收正在处理的作业和一个应调用的回调以继续处理作业。

创建作业中间件后,可以通过从作业的 middleware 方法返回它们来将它们附加到作业。此方法在 make:job Artisan 命令生成的作业中不存在,因此您需要手动将其添加到作业类中:

php
use App\Jobs\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [new RateLimited];
}
lightbulb

作业中间件也可以分配给可队列的事件监听器、邮件和通知。

速率限制

虽然我们刚刚演示了如何编写自己的速率限制作业中间件,但 Laravel 实际上包含了一个速率限制中间件,您可以利用它来速率限制作业。像 路由速率限制器 一样,作业速率限制器是使用 RateLimiter facade 的 for 方法定义的。

例如,您可能希望允许用户每小时备份一次数据,而对高级客户不施加此类限制。为此,您可以在 AppServiceProviderboot 方法中定义一个 RateLimiter

php
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * 启动任何应用程序服务。
 *
 * @return void
 */
public function boot()
{
    RateLimiter::for('backups', function ($job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perHour(1)->by($job->user->id);
    });
}

在上面的示例中,我们定义了一个每小时的速率限制;但是,您可以轻松地使用 perMinute 方法定义基于分钟的速率限制。此外,您可以将任何值传递给速率限制的 by 方法;但是,此值最常用于按客户分段速率限制:

php
return Limit::perMinute(50)->by($job->user->id);

定义速率限制后,您可以使用 Illuminate\Queue\Middleware\RateLimited 中间件将速率限制器附加到备份作业。每次作业超过速率限制时,此中间件将根据速率限制持续时间将作业释放回队列,并进行适当的延迟。

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [new RateLimited('backups')];
}

将速率限制的作业释放回队列仍会增加作业的总 attempts 数量。您可能希望相应地调整作业类上的 triesmaxExceptions 属性。或者,您可能希望使用 retryUntil 方法 来定义作业不再尝试的时间。

如果您不希望在速率限制时重试作业,可以使用 dontRelease 方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [(new RateLimited('backups'))->dontRelease()];
}
lightbulb

如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件,该中间件针对 Redis 进行了微调,比基本速率限制中间件更高效。

防止作业重叠

Laravel 包含一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许您根据任意键防止作业重叠。当队列作业正在修改一个只能由一个作业同时修改的资源时,这可能会很有帮助。

例如,假设您有一个更新用户信用评分的队列作业,并且您希望防止同一用户 ID 的信用评分更新作业重叠。为此,您可以从作业的 middleware 方法返回 WithoutOverlapping 中间件:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [new WithoutOverlapping($this->user->id)];
}

任何相同类型的重叠作业将被释放回队列。您还可以指定必须经过的秒数,然后才会再次尝试释放的作业:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果您希望立即删除任何重叠作业,以便它们不会被重试,可以使用 dontRelease 方法:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件由 Laravel 的原子锁功能提供支持。有时,您的作业可能会意外失败或超时,以至于锁未被释放。因此,您可以使用 expireAfter 方法显式定义锁过期时间。例如,下面的示例将指示 Laravel 在作业开始处理三分钟后释放 WithoutOverlapping 锁:

php
/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
exclamation

WithoutOverlapping 中间件需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。

在作业类之间共享锁键

默认情况下,WithoutOverlapping 中间件只会防止相同类的作业重叠。因此,尽管两个不同的作业类可能使用相同的锁键,但它们不会被阻止重叠。但是,您可以使用 shared 方法指示 Laravel 在作业类之间应用键:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...


    public function middleware()
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...


    public function middleware()
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

节流异常

Laravel 包含一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许您节流异常。一旦作业抛出给定数量的异常,所有进一步的作业执行尝试都将被延迟,直到指定的时间间隔过去。此中间件对于与不稳定的第三方服务交互的作业特别有用。

例如,假设一个与第三方 API 交互的队列作业开始抛出异常。要节流异常,您可以从作业的 middleware 方法返回 ThrottlesExceptions 中间件。通常,此中间件应与实现 基于时间的尝试 的作业配对:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [new ThrottlesExceptions(10, 5)];
}

/**
 * 确定作业应超时的时间。
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addMinutes(5);
}

中间件接受的第一个构造函数参数是作业可以抛出的异常数量,第二个构造函数参数是作业在被节流后应等待的分钟数。在上面的代码示例中,如果作业在 5 分钟内抛出 10 个异常,我们将等待 5 分钟后再尝试作业。

当作业抛出异常但异常阈值尚未达到时,作业通常会立即重试。但是,您可以在将中间件附加到作业时调用 backoff 方法来指定此类作业应延迟的分钟数:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [(new ThrottlesExceptions(10, 5))->backoff(5)];
}

在内部,此中间件使用 Laravel 的缓存系统来实现速率限制,并且作业的类名用作缓存“键”。您可以在将中间件附加到作业时调用 by 方法来覆盖此键。如果您有多个作业与同一第三方服务交互,并且希望它们共享一个公共的节流“桶”,这可能会很有用:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [(new ThrottlesExceptions(10, 10))->by('key')];
}
lightbulb

如果您使用 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件,该中间件针对 Redis 进行了微调,比基本异常节流中间件更高效。

调度作业

编写作业类后,您可以使用作业本身的 dispatch 方法调度它。传递给 dispatch 方法的参数将传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function store(Request $request)
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);
    }
}

如果您希望有条件地调度作业,可以使用 dispatchIfdispatchUnless 方法:

php
ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,sync 驱动程序是默认的队列驱动程序。此驱动程序在当前请求的前台同步执行作业,这在本地开发期间通常很方便。如果您希望实际开始将作业排队以进行后台处理,可以在应用程序的 config/queue.php 配置文件中指定不同的队列驱动程序。

延迟调度

如果您希望指定作业不应立即可供队列工作者处理,可以在调度作业时使用 delay 方法。例如,让我们指定作业在调度后 10 分钟内不可用:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function store(Request $request)
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
                    ->delay(now()->addMinutes(10));
    }
}
exclamation

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

在响应发送到浏览器后调度

或者,dispatchAfterResponse 方法会延迟调度作业,直到 HTTP 响应发送到用户的浏览器(如果您的 Web 服务器使用 FastCGI)。这仍然允许用户开始使用应用程序,即使队列作业仍在执行。通常,这仅应用于大约需要一秒钟的作业,例如发送电子邮件。由于它们在当前 HTTP 请求中处理,因此以这种方式调度的作业不需要队列工作者运行即可处理它们:

php
use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

您还可以 dispatch 一个闭包,并将 afterResponse 方法链接到 dispatch 助手,以便在 HTTP 响应发送到浏览器后执行闭包:

php
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果您希望立即(同步)调度作业,可以使用 dispatchSync 方法。使用此方法时,作业不会被排队,而是会在当前进程中立即执行:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function store(Request $request)
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);
    }
}

作业与数据库事务

虽然在数据库事务中调度作业是完全可以的,但您应该特别注意确保您的作业能够成功执行。在事务中调度作业时,可能会在父事务提交之前由工作者处理作业。当这种情况发生时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务期间创建的任何模型或数据库记录可能不存在于数据库中。

幸运的是,Laravel 提供了几种方法来解决此问题。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项:

php
'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项为 true 时,您可以在数据库事务中调度作业;但是,Laravel 将等待打开的父数据库事务提交后再实际调度作业。当然,如果当前没有打开的数据库事务,作业将立即调度。

如果由于事务期间发生的异常而回滚事务,则在该事务期间调度的作业将被丢弃。

lightbulb

after_commit 配置选项设置为 true 还会导致任何排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被调度。

内联指定提交调度行为

如果您未将 after_commit 队列连接配置选项设置为 true,您仍然可以指示特定作业应在所有打开的数据库事务提交后调度。为此,您可以将 afterCommit 方法链接到您的调度操作:

php
use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,您可以指示特定作业应立即调度,而无需等待任何打开的数据库事务提交:

php
ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定在主作业成功执行后应按顺序运行的队列作业列表。如果链中的一个作业失败,则不会运行其余的作业。要执行队列作业链,您可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是构建在队列作业调度之上的较低级别组件:

php
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接作业类实例,您还可以链接闭包:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();
exclamation

在作业中使用 $this->delete() 方法删除作业不会阻止链式作业被处理。链将仅在链中的作业失败时停止执行。

链连接和队列

如果您希望指定应为链式作业使用的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定队列连接和队列名称,除非队列作业明确分配了不同的连接/队列:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

链失败

在链接作业时,您可以使用 catch 方法指定在链中的作业失败时应调用的闭包。给定的回调将接收导致作业失败的 Throwable 实例:

php
use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的作业失败...
})->dispatch();
exclamation

由于链回调在稍后由 Laravel 队列序列化并执行,因此您不应在链回调中使用 $this 变量。

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以“分类”队列作业,甚至可以优先考虑为各种队列分配多少工作者。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而仅是单个连接中的特定队列。要指定队列,请在调度作业时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function store(Request $request)
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}

或者,您可以通过在作业的构造函数中调用 onQueue 方法来指定作业的队列:

php
<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 创建一个新的作业实例。
     *
     * @return void
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

调度到特定连接

如果您的应用程序与多个队列连接交互,可以使用 onConnection 方法指定将作业推送到哪个连接:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function store(Request $request)
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

您可以将 onConnectiononQueue 方法链接在一起,以指定作业的连接和队列:

php
ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

或者,您可以通过在作业的构造函数中调用 onConnection 方法来指定作业的连接:

php
<?php

namespace App\Jobs;

 use Illuminate\Bus\Queueable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Bus\Dispatchable;
 use Illuminate\Queue\InteractsWithQueue;
 use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 创建一个新的作业实例。
     *
     * @return void
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大作业尝试次数/超时值

最大尝试次数

如果您的队列作业遇到错误,您可能不希望它无限期地重试。因此,Laravel 提供了多种方法来指定作业可以尝试的次数或时间。

指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将适用于工作者处理的所有作业,除非正在处理的作业指定了可以尝试的次数:

shell
php artisan queue:work --tries=3

如果作业超过其最大尝试次数,它将被视为“失败”作业。有关处理失败作业的更多信息,请参阅 失败作业文档。如果 --tries=0 提供给 queue:work 命令,作业将无限期重试。

您可以通过在作业类本身上定义作业可以尝试的最大次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

基于时间的尝试

作为定义作业可以尝试的最大次数的替代方法,您可以定义作业不再尝试的时间。这允许在给定时间范围内尝试任意次数的作业。要定义作业不再尝试的时间,请在作业类中添加一个 retryUntil 方法。此方法应返回一个 DateTime 实例:

php
/**
 * 确定作业应超时的时间。
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addMinutes(10);
}
lightbulb

您还可以在 队列事件监听器 上定义 tries 属性或 retryUntil 方法。

最大异常

有时您可能希望指定作业可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接由 release 方法释放),则应失败。为此,您可以在作业类上定义一个 maxExceptions 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 25;

    /**
     * 允许的最大未处理异常数。
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * 执行作业。
     *
     * @return void
     */
    public function handle()
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获取到锁,处理播客...
        }, function () {
            // 无法获取锁...
            return $this->release(10);
        });
    }
}

在此示例中,如果应用程序无法获取 Redis 锁,作业将释放十秒钟,并将继续重试最多 25 次。但是,如果作业抛出三个未处理异常,则作业将失败。

超时

exclamation

必须安装 pcntl PHP 扩展才能指定作业超时。

通常,您大致知道队列作业的预期执行时间。出于这个原因,Laravel 允许您指定“超时”值。默认情况下,超时值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理作业的工作者将以错误退出。通常,工作者将由 服务器上配置的进程管理器 自动重新启动。

可以使用 Artisan 命令行上的 --timeout 开关指定作业可以运行的最大秒数:

shell
php artisan queue:work --timeout=30

如果作业因持续超时而超过其最大尝试次数,它将被标记为失败。

您还可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以运行的最大秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不尊重您指定的超时。因此,在使用这些功能时,您应始终尝试使用其 API 指定超时值。例如,在使用 Guzzle 时,您应始终指定连接和请求超时值。

超时失败

如果您希望指示作业在超时时应标记为 失败,可以在作业类上定义 $failOnTimeout 属性:

php
/**
 * 指示作业在超时时是否应标记为失败。
 *
 * @var bool
 */
public $failOnTimeout = true;

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便可以再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到

手动释放作业

有时您可能希望手动将作业释放回队列,以便可以在稍后时间再次尝试。您可以通过调用 release 方法来实现:

php
/**
 * 执行作业。
 *
 * @return void
 */
public function handle()
{
    // ...

    $this->release();
}

默认情况下,release 方法会将作业释放回队列以立即处理。但是,通过将整数传递给 release 方法,您可以指示队列在给定秒数过去之前不使作业可用:

php
$this->release(10);

手动失败作业

有时您可能需要手动将作业标记为“失败”。为此,您可以调用 fail 方法:

php
/**
 * 执行作业。
 *
 * @return void
 */
public function handle()
{
    // ...

    $this->fail();
}

如果您希望由于捕获的异常而将作业标记为失败,可以将异常传递给 fail 方法。或者,为了方便起见,您可以传递一个字符串错误消息,该消息将为您转换为异常:

php
$this->fail($exception);

$this->fail('出了点问题。');
lightbulb

有关失败作业的更多信息,请查看 处理作业失败的文档

作业批处理

Laravel 的作业批处理功能允许您轻松执行一批作业,然后在作业批处理完成执行时执行某些操作。在开始之前,您应该创建一个数据库迁移,以构建一个表来包含有关作业批处理的元信息,例如其完成百分比。可以使用 queue:batches-table Artisan 命令生成此迁移:

shell
php artisan queue:batches-table

php artisan migrate

定义可批处理的作业

要定义可批处理的作业,您应该像往常一样 创建一个可队列的作业;但是,您应该将 Illuminate\Bus\Batchable 特性添加到作业类中。此特性提供了一个 batch 方法,可用于检索作业正在执行的当前批处理:

php
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 执行作业。
     *
     * @return void
     */
    public function handle()
    {
        if ($this->batch()->cancelled()) {
            // 确定批处理是否已取消...

            return;
        }

        // 导入 CSV 文件的一部分...
    }
}

调度批处理

要调度一批作业,您应该使用 Bus facade 的 batch 方法。当然,批处理在与完成回调结合使用时最有用。因此,您可以使用 thencatchfinally 方法为批处理定义完成回调。每个回调在调用时都会接收一个 Illuminate\Bus\Batch 实例。在此示例中,我们将假设我们正在排队一批作业,每个作业处理 CSV 文件中的给定行数:

php
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->catch(function (Batch $batch, Throwable $e) {
    // 检测到第一个批处理作业失败...
})->finally(function (Batch $batch) {
    // 批处理已完成执行...
})->dispatch();

return $batch->id;

批处理的 ID,可以通过 $batch->id 属性访问,可用于 查询 Laravel 命令总线 以获取有关批处理的信息。

exclamation

由于批处理回调在稍后由 Laravel 队列序列化并执行,因此您不应在回调中使用 $this 变量。

命名批处理

某些工具(如 Laravel Horizon 和 Laravel Telescope)可能会为批处理提供更用户友好的调试信息。如果批处理被命名,可以通过在定义批处理时调用 name 方法来为批处理分配任意名称:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->name('Import CSV')->dispatch();

批处理连接和队列

如果您希望指定应为批处理作业使用的连接和队列,可以使用 onConnectiononQueue 方法。所有批处理作业必须在同一连接和队列中执行:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->onConnection('redis')->onQueue('imports')->dispatch();

批处理中的链

您可以通过将链式作业放在数组中来定义批处理中的 链式作业。例如,我们可以并行执行两个作业链,并在两个作业链完成处理时执行回调:

php
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

向批处理中添加作业

有时,在批处理作业中添加其他作业可能很有用。当您需要批处理数千个作业时,这种模式可能很有用,因为在 Web 请求期间调度这些作业可能需要太长时间。因此,您可能希望调度一批“加载器”作业,以便为批处理提供更多作业:

php
$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->name('Import Contacts')->dispatch();

在此示例中,我们将使用 LoadImportBatch 作业为批处理提供更多作业。为此,我们可以使用作业的 batch 方法访问的批处理实例上的 add 方法:

php
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * 执行作业。
 *
 * @return void
 */
public function handle()
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}
exclamation

您只能在属于同一批处理的作业中向批处理中添加作业。

检查批处理

提供给批处理完成回调的 Illuminate\Bus\Batch 实例具有多种属性和方法,可帮助您与给定的作业批处理进行交互和检查:

php
// 批处理的 UUID...
$batch->id;

// 批处理的名称(如果适用)...
$batch->name;

// 分配给批处理的作业数量...
$batch->totalJobs;

// 队列尚未处理的作业数量...
$batch->pendingJobs;

// 失败的作业数量...
$batch->failedJobs;

// 到目前为止已处理的作业数量...
$batch->processedJobs();

// 批处理的完成百分比(0-100)...
$batch->progress();

// 指示批处理是否已完成执行...
$batch->finished();

// 取消批处理的执行...
$batch->cancel();

// 指示批处理是否已取消...
$batch->cancelled();

从路由返回批处理

所有 Illuminate\Bus\Batch 实例都是 JSON 可序列化的,这意味着您可以直接从应用程序的路由返回它们,以检索包含有关批处理的信息的 JSON 负载,包括其完成进度。这使得在应用程序的 UI 中显示有关批处理完成进度的信息变得方便。

要通过其 ID 检索批处理,可以使用 Bus facade 的 findBatch 方法:

php
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

取消批处理

有时您可能需要取消给定批处理的执行。这可以通过调用 Illuminate\Bus\Batch 实例上的 cancel 方法来实现:

php
/**
 * 执行作业。
 *
 * @return void
 */
public function handle()
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

正如您在前面的示例中可能注意到的,批处理作业通常应在继续执行之前确定其对应的批处理是否已取消。但是,为了方便起见,您可以将 SkipIfBatchCancelled 中间件 分配给作业。顾名思义,此中间件将指示 Laravel 如果其对应的批处理已取消,则不处理作业:

php
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [new SkipIfBatchCancelled];
}

批处理失败

当批处理作业失败时,将调用 catch 回调(如果已分配)。此回调仅在批处理中的第一个作业失败时调用。

允许失败

当批处理中的作业失败时,Laravel 将自动将批处理标记为“已取消”。如果您愿意,可以禁用此行为,以便作业失败不会自动将批处理标记为已取消。这可以通过在调度批处理时调用 allowFailures 方法来实现:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->allowFailures()->dispatch();

重试失败的批处理作业

为了方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许您轻松重试给定批处理的所有失败作业。queue:retry-batch 命令接受应重试其失败作业的批处理的 UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批处理

如果不进行修剪,job_batches 表可能会非常快速地积累记录。为了解决这个问题,您应该 计划 queue:prune-batches Artisan 命令每天运行:

php
$schedule->command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批处理将被修剪。您可以在调用命令时使用 hours 选项来确定保留批处理数据的时间。例如,以下命令将删除所有在 48 小时前完成的批处理:

php
$schedule->command('queue:prune-batches --hours=48')->daily();

有时,您的 jobs_batches 表可能会积累从未成功完成的批处理的批处理记录,例如作业失败且该作业从未成功重试的批处理。您可以指示 queue:prune-batches 命令修剪这些未完成的批处理记录,使用 unfinished 选项:

php
$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的 jobs_batches 表也可能会积累已取消批处理的批处理记录。您可以指示 queue:prune-batches 命令修剪这些已取消的批处理记录,使用 cancelled 选项:

php
$schedule->command('queue:prune-batches --hours=48 --cancelled=72')->daily();

队列闭包

您可以将闭包调度到队列,而不是将作业类调度到队列。这对于需要在当前请求周期之外执行的快速简单任务非常有用。在将闭包调度到队列时,闭包的代码内容会被加密签名,以便在传输过程中无法修改:

php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

使用 catch 方法,您可以提供一个闭包,如果队列闭包在耗尽所有 配置的重试尝试 后未能成功完成,则应执行该闭包:

php
use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // 此作业已失败...
});
exclamation

由于 catch 回调在稍后由 Laravel 队列序列化并执行,因此您不应在 catch 回调中使用 $this 变量。

运行队列工作者

queue:work 命令

Laravel 包含一个 Artisan 命令,该命令将启动队列工作者并在新作业推送到队列时处理它们。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

shell
php artisan queue:work
lightbulb

要使 queue:work 进程永久在后台运行,您应该使用 Supervisor 等进程监视器,以确保队列工作者不会停止运行。

如果您希望在调用 queue:work 命令时包含已处理作业 ID,可以包含 -v 标志:

shell
php artisan queue:work -v

请记住,队列工作者是长时间运行的进程,并在内存中存储已启动的应用程序状态。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重新启动队列工作者。此外,请记住,应用程序创建或修改的任何静态状态在作业之间不会自动重置。

或者,您可以运行 queue:listen 命令。使用 queue:listen 命令时,您不必在想要重新加载更新的代码或重置应用程序状态时手动重新启动工作者;但是,此命令的效率明显低于 queue:work 命令:

shell
php artisan queue:listen

运行多个队列工作者

要为队列分配多个工作者并并发处理作业,您只需启动多个 queue:work 进程。这可以在本地通过终端中的多个选项卡完成,也可以在生产中使用进程管理器的配置设置完成。使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接的默认队列的作业。但是,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

shell
php artisan queue:work redis --queue=emails

处理指定数量的作业

可以使用 --once 选项指示工作者仅处理队列中的一个作业:

shell
php artisan queue:work --once

可以使用 --max-jobs 选项指示工作者处理给定数量的作业,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作者在处理给定数量的作业后自动重新启动,释放它们可能积累的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有排队的作业然后退出

可以使用 --stop-when-empty 选项指示工作者处理所有作业,然后优雅地退出。如果您希望在队列为空后关闭容器,则此选项在 Docker 容器中处理 Laravel 队列时可能很有用:

shell
php artisan queue:work --stop-when-empty

处理给定秒数的作业

可以使用 --max-time 选项指示工作者处理给定秒数的作业,然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作者在处理给定时间的作业后自动重新启动,释放它们可能积累的任何内存:

shell
# 处理作业一小时,然后退出...
php artisan queue:work --max-time=3600

工作者休眠时间

当队列中有作业可用时,工作者将继续处理作业,而不会在作业之间延迟。但是,sleep 选项确定如果没有可用作业,工作者将“休眠”多少秒。当然,在休眠期间,工作者不会处理任何新作业:

shell
php artisan queue:work --sleep=3

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,您应该在处理完图像后使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列的处理方式。例如,在 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。但是,有时您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,以确保在继续处理 low 队列的作业之前处理所有 high 队列作业,请将队列名称的逗号分隔列表传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列工作者与部署

由于队列工作者是长时间运行的进程,因此它们不会注意到代码的更改而无需重新启动。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart 命令优雅地重新启动所有工作者:

shell
php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业后优雅地退出,以便不会丢失现有作业。由于在执行 queue:restart 命令时队列工作者将退出,因此您应该运行一个进程管理器,例如 Supervisor,以自动重新启动队列工作者。

lightbulb

队列使用 缓存 存储重启信号,因此在使用此功能之前,您应验证应用程序是否正确配置了缓存驱动程序。

作业过期和超时

作业过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未释放或删除,则作业将被释放回队列。通常,您应该将 retry_after 值设置为作业应合理完成处理的最大秒数。

exclamation

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。默认情况下,--timeout 值为 60 秒。如果作业处理时间超过超时值指定的秒数,处理作业的工作者将以错误退出。通常,工作者将由 服务器上配置的进程管理器 自动重新启动:

shell
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。

exclamation

--timeout 值应始终至少比 retry_after 配置值短几秒钟。这将确保在作业重试之前始终终止处理冻结作业的工作者。如果您的 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

Supervisor 配置

在生产环境中,你需要一种方法来保持 queue:work 进程的运行。queue:work 进程可能会因为多种原因停止运行,例如超出工作者超时时间或执行 queue:restart 命令。

因此,你需要配置一个进程监控器来检测 queue:work 进程何时退出并自动重启它们。此外,进程监控器可以让你指定希望同时运行多少个 queue:work 进程。Supervisor 是一个常用于 Linux 环境的进程监控器,我们将在接下来的文档中讨论如何配置它。

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监控器,如果 queue:work 进程失败,它将自动重启它们。要在 Ubuntu 上安装 Supervisor,可以使用以下命令:

shell
sudo apt-get install supervisor
lightbulb

如果自己配置和管理 Supervisor 让你感到不知所措,可以考虑使用 Laravel Forge,它会自动为你的生产 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,你可以创建任意数量的配置文件,指示 Supervisor 如何监控你的进程。例如,让我们创建一个 laravel-worker.conf 文件来启动和监控 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行八个 queue:work 进程并监控所有进程,如果它们失败会自动重启。你应该更改配置中的 command 指令以反映你所需的队列连接和工作者选项。

exclamation

你应该确保 stopwaitsecs 的值大于最长运行作业所消耗的秒数。否则,Supervisor 可能会在作业完成处理之前将其终止。

启动 Supervisor

创建配置文件后,可以使用以下命令更新 Supervisor 配置并启动进程:

shell
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的作业

有时你的队列作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在异步作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。同步调度的作业失败时不会存储在此表中,其异常会立即由应用程序处理。

在新的 Laravel 应用程序中,通常已经存在一个用于创建 failed_jobs 表的迁移。然而,如果你的应用程序不包含此表的迁移,可以使用 queue:failed-table 命令创建迁移:

shell
php artisan queue:failed-table

php artisan migrate

运行队列工作者进程时,可以使用 queue:work 命令上的 --tries 开关指定作业应尝试的最大次数。如果未为 --tries 选项指定值,作业将仅尝试一次或按作业类的 $tries 属性指定的次数尝试:

shell
php artisan queue:work redis --tries=3

使用 --backoff 选项,可以指定 Laravel 在重试遇到异常的作业之前应等待多少秒。默认情况下,作业会立即释放回队列,以便可以再次尝试:

shell
php artisan queue:work redis --tries=3 --backoff=3

如果希望在每个作业的基础上配置 Laravel 在重试遇到异常的作业之前应等待多少秒,可以通过在作业类上定义 backoff 属性来实现:

php
/**
 * 在重试作业之前等待的秒数。
 *
 * @var int
 */
public $backoff = 3;

如果需要更复杂的逻辑来确定作业的回退时间,可以在作业类上定义一个 backoff 方法:

php
/**
* 计算在重试作业之前等待的秒数。
*
* @return int
*/
public function backoff()
{
    return 3;
}

可以通过从 backoff 方法返回一个回退值数组来轻松配置“指数”回退。在此示例中,重试延迟将为第一次重试 1 秒,第二次重试 5 秒,第三次重试 10 秒:

php
/**
* 计算在重试作业之前等待的秒数。
*
* @return array
*/
public function backoff()
{
    return [1, 5, 10];
}

清理失败的作业

当某个作业失败时,你可能希望向用户发送警报或撤销作业部分完成的任何操作。为此,可以在作业类上定义一个 failed 方法。导致作业失败的 Throwable 实例将传递给 failed 方法:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 播客实例。
     *
     * @var \App\Podcast
     */
    public $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  \App\Models\Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  \App\Services\AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }

    /**
     * 处理作业失败。
     *
     * @param  \Throwable  $exception
     * @return void
     */
    public function failed(Throwable $exception)
    {
        // 发送用户失败通知等...
    }
}
exclamation

在调用 failed 方法之前,会实例化作业的新实例;因此,在 handle 方法中可能发生的任何类属性修改都将丢失。

重试失败的作业

要查看已插入 failed_jobs 数据库表中的所有失败作业,可以使用 queue:failed Artisan 命令:

shell
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列、失败时间和有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失败作业,请发出以下命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

如有必要,可以将多个 ID 传递给命令:

shell
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

还可以重试特定队列的所有失败作业:

shell
php artisan queue:retry --queue=name

要重试所有失败的作业,请执行 queue:retry 命令并将 all 作为 ID 传递:

shell
php artisan queue:retry all

如果希望删除失败的作业,可以使用 queue:forget 命令:

shell
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
lightbulb

使用 Horizon 时,应使用 horizon:forget 命令删除失败的作业,而不是 queue:forget 命令。

要从 failed_jobs 表中删除所有失败的作业,可以使用 queue:flush 命令:

shell
php artisan queue:flush

忽略缺失的模型

在将 Eloquent 模型注入作业时,模型会在放入队列之前自动序列化,并在作业处理时从数据库中重新检索。然而,如果模型在作业等待工作者处理时被删除,作业可能会因 ModelNotFoundException 而失败。

为了方便起见,可以通过将作业的 deleteWhenMissingModels 属性设置为 true 来选择自动删除缺失模型的作业。当此属性设置为 true 时,Laravel 将安静地丢弃作业而不引发异常:

php
/**
 * 如果模型不再存在,则删除作业。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

修剪失败的作业

可以通过调用 queue:prune-failed Artisan 命令来修剪应用程序的 failed_jobs 表中的记录:

shell
php artisan queue:prune-failed

默认情况下,所有超过 24 小时的失败作业记录将被修剪。如果为命令提供 --hours 选项,则仅保留在最近 N 小时内插入的失败作业记录。例如,以下命令将删除所有超过 48 小时的失败作业记录:

shell
php artisan queue:prune-failed --hours=48

在 DynamoDB 中存储失败的作业

Laravel 还提供了将失败作业记录存储在 DynamoDB 中而不是关系数据库表中的支持。然而,必须创建一个 DynamoDB 表来存储所有失败的作业记录。通常,此表应命名为 failed_jobs,但应根据应用程序的 queue 配置文件中的 queue.failed.table 配置值命名表。

failed_jobs 表应具有一个名为 application 的字符串主分区键和一个名为 uuid 的字符串主排序键。键的 application 部分将包含应用程序的名称,如应用程序的 app 配置文件中的 name 配置值所定义。由于应用程序名称是 DynamoDB 表键的一部分,可以使用同一表来存储多个 Laravel 应用程序的失败作业。

此外,请确保安装 AWS SDK,以便 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

接下来,将 queue.failed.driver 配置选项的值设置为 dynamodb。此外,应在失败作业配置数组中定义 keysecretregion 配置选项。这些选项将用于 AWS 身份验证。使用 dynamodb 驱动程序时,不需要 queue.failed.database 配置选项:

php
'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

禁用失败作业存储

可以通过将 queue.failed.driver 配置选项的值设置为 null 来指示 Laravel 丢弃失败的作业而不存储它们。通常,可以通过 QUEUE_FAILED_DRIVER 环境变量来实现:

ini
QUEUE_FAILED_DRIVER=null

失败作业事件

如果希望注册一个事件监听器,该监听器将在作业失败时调用,可以使用 Queue facade 的 failing 方法。例如,可以从 Laravel 附带的 AppServiceProviderboot 方法中附加一个闭包到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

从队列中清除作业

lightbulb

使用 Horizon 时,应使用 horizon:clear 命令从队列中清除作业,而不是 queue:clear 命令。

如果希望从默认连接的默认队列中删除所有作业,可以使用 queue:clear Artisan 命令:

shell
php artisan queue:clear

还可以提供 connection 参数和 queue 选项以从特定连接和队列中删除作业:

shell
php artisan queue:clear redis --queue=emails
exclamation

从队列中清除作业仅适用于 SQS、Redis 和数据库队列驱动程序。此外,SQS 消息删除过程最多需要 60 秒,因此在清除队列后最多 60 秒内发送到 SQS 队列的作业也可能会被删除。

监控你的队列

如果队列接收到突然涌入的作业,可能会不堪重负,导致作业完成的等待时间过长。如果愿意,Laravel 可以在队列作业计数超过指定阈值时提醒你。

要开始,可以安排 queue:monitor 命令每分钟运行。该命令接受你希望监控的队列名称以及所需的作业计数阈值:

shell
php artisan queue:monitor redis:default,redis:deployments --max=100

仅安排此命令不足以触发通知以提醒你队列的超负荷状态。当命令遇到作业计数超过阈值的队列时,将调度一个 Illuminate\Queue\Events\QueueBusy 事件。可以在应用程序的 EventServiceProvider 中监听此事件,以便向你或开发团队发送通知:

php
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * 为应用程序注册任何其他事件。
 *
 * @return void
 */
public function boot()
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
                ->notify(new QueueHasLongWaitTime(
                    $event->connection,
                    $event->queue,
                    $event->size
                ));
    });
}

作业事件

使用 Queue facade 上的 beforeafter 方法,可以指定在处理队列作业之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的好机会。通常,应从服务提供者boot 方法中调用这些方法。例如,可以使用 Laravel 附带的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用 Queue facade 上的 looping 方法,可以指定在工作者尝试从队列获取作业之前执行的回调。例如,可以注册一个闭包以回滚任何由先前失败的作业留下的事务:

php
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});