Laravel 中使用 Redis 队列处理异步任务
这篇文章来自 Laravel China 教程中的第二本书 《Web 实战开发进阶》,全书搭建了一个论坛系统。在之前的章节中我们已经完成了 SEO 友好的 URL 功能即通过将帖子标题翻译成英文并在该帖子的 URL 中显示这一功能我们集成使用了百度翻译接口默认情况下采用的是实时请求 API 通常情况下会有各种不确定因素影响如果出现超时问题或者遇到不可预见的技术故障可能导致用户无法正常发帖
该 URL 仅作为一个 功能优化 ,并非用于发帖所需的一个 非必须 功能。我们期望无论生成 英文标题 的效果如何,所有人都能轻松地发布帖子,并且不会感知到任何延迟。
因此我们可以采用队列机制来实现这一功能;队列支持异步执行耗时操作;例如发送一个API请求并等待其返回结果。这种方法能够显著地降低请求响应的时间;实现这个功能需要以下几个步骤:
1.配置队列
Redis将被用作我们的队列驱动器;通过Composer安装必要的依赖项:\$ composer require "predis/predis:~1";环境变量QUEUE_DRIVER将被设置为 redis:
.env
.
.
.
QUEUE_CONNECTION=redis
.
.
.
失败任务
在某些情况下,队列中的任务可能会出现失败。Laravel 提供了一种便捷方法来设置任务重试的最大次数。一旦任务的重试次数超过预先设定的上限时,系统会将其记录至 failed_jobs 数据库表中。为了生成迁移文件以更新 failed_jobs 表,请使用命令 php artisan queue:failed-table:
$ php artisan queue:failed-table
该迁移脚本将被生成到指定路径中的database/migrations/{timestamp}_create_failed_jobs_table.php文件。
接着使用 migrate Artisan 命令生成 failed_jobs 表:
$ php artisan migrate
2.生成任务类
使用以下 Artisan 命令来生成一个新的队列任务:
$ php artisan make:job TranslateSlug
该命令会在 app/Jobs 目录下生成一个新的类:
app/Jobs/TranslateSlug.php
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Models\Topic;
use App\Handlers\SlugTranslateHandler;
class TranslateSlug implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $topic;
public function __construct(Topic $topic)
{
// 队列任务构造器中接收了 Eloquent 模型,将会只序列化模型的 ID
$this->topic = $topic;
}
public function handle()
{
// 请求百度 API 接口进行翻译
$slug = app(SlugTranslateHandler::class)->translate($this->topic->title);
// 为了避免模型监控器死循环调用,我们使用 DB 类直接对数据库进行操作
\DB::table('topics')->where('id', $this->topic->id)->update(['slug' => $slug]);
}
}
该类遵循了 Illuminate 皇后模块 ShouldQueue 接口,并被定义为将任务添加到后台队列中而不是立即执行。
3.任务分发
下一步我们将调整 Topic 模型监控器的功能实现,并采用队列执行的方式来优化现有函数调用
app/Observers/TopicObserver.php
<?php
namespace App\Observers;
use App\Models\Topic;
use App\Jobs\TranslateSlug;
// creating, created, updating, updated, saving,
// saved, deleting, deleted, restoring, restored
class TopicObserver
{
public function saving(Topic $topic)
{
// XSS 过滤
$topic->body = clean($topic->body, 'user_topic_body');
// 生成话题摘录
$topic->excerpt = make_excerpt($topic->body);
// 如 slug 字段无内容,即使用翻译器对 title 进行翻译
if ( ! $topic->slug) {
// 推送任务到队列
dispatch(new TranslateSlug($topic));
}
}
}
4.开始测试
在操作开始前,请确保在命令行中进行队列系统的启动,并注意观察队列状态以确保其在启动完成后能够正常运行。
$ php artisan queue:listen
浏览器打开话题发布页面,填写测试内容:

点击『保存』按钮提交表单后,可以在命令行中看到监听的状态:

注意到我们的任务 Failed 已经运行并出现了故障。建议检查数据库中的 failed_jobs 表以获取相关信息。

尽管我们可以通过 payload 和 exception 字段获取错误数据,但这些数据是在经过序列化处理之后的,并不足够直观。

接下来我们将寻找更好的队列监控方案。
5.队列监控 Horizon

Horizon 是 Laravel 生态系统的组成部分之一,并向 Laravel Redis 队列提供了一个美观的监控界面,方便用户查询和管理Redis队列的任务执行情况。
使用 Composer 安装:
$ composer require "laravel/horizon:~1.3"
安装完成后,使用 vendor:publish Artisan 命令发布相关文件:
Run the PHP artisan command to publish the VCS vendor package with the Horizon ServiceProvider.
包括两个部分:一个是配置文件夹 config/horizon.php ,另一个是位于 public/vendor/horizon 文件夹中的 CSS 以及 JS 等页面资源文件
至此安装完毕,浏览器打开 http://larabbs.test/horizon 访问控制台:

Horizon 是一个监控程序,需要常驻运行,我们可以通过以下命令启动:
$ php artisan horizon
部署了 Horizon 以后, horizon 命令将被用来启动队列系统和任务监控,无需依赖于 queue:listen
接下来我们再次测试下发过程是否正常运行。发帖前,请使 horizon 命令进入监控状态。

此次得益于 Horizon。我们能够查看更为详细的信息。该异常信息为 ModelNotFoundException。此问题最为关键。

经排查发现,在Data块中存在id字段值为空的情况。由于我们在处理队列任务时会将Eloquent模型传递给队列系统进行序列化操作,在此过程中系统会自动提取并保存各字段信息。因此,在 saving() 方法执行时会将当前未被创建的主题模型$topic的id字段设为null值。
6.代码调整
一旦我们识别了该问题所在并掌握了其影响范围之后,则可以通过优化资源分配来实现快速响应这一目标。具体来说,在处理相似事件时需要确保系统能够及时获取相关信息,并在必要时触发相关应急机制以减少潜在损失的可能性。为了进一步提升应对效率,在日常运营中应定期评估现有流程并根据实际效果进行适当调整以优化资源配置效率:
app/Observers/TopicObserver.php
<?php
namespace App\Observers;
use App\Models\Topic;
use App\Jobs\TranslateSlug;
// creating, created, updating, updated, saving,
// saved, deleting, deleted, restoring, restored
class TopicObserver
{
public function saving(Topic $topic)
{
// XSS 过滤
$topic->body = clean($topic->body, 'user_topic_body');
// 生成话题摘录
$topic->excerpt = make_excerpt($topic->body);
}
public function saved(Topic $topic)
{
// 如 slug 字段无内容,即使用翻译器对 title 进行翻译
if ( ! $topic->slug) {
// 推送任务到队列
dispatch(new TranslateSlug($topic));
}
}
}
模型监控器的保存操作(saved())与Eloquent的save事件相匹配,在建立或更新条目以及数据被提交之后触发;通过调用这个方法,在分发任务时我们能够确保$topic->id始终有效。
需要注意的是,在 artisan horizon 队列工作的守护进程中设置了常驻运行状态设置,并且该守护进程具备高度的稳定性特点,在遇到代码更改时无需重启功能组件。因此当代码发生更改后就需要通过命令行界面重新启动相关服务程序以恢复正常工作流程
重启 horizon 命令后再次尝试,即可看到成功运行的队列:

7.线上部署须知
在开发环境中,我们为了便于测试,可以直接在命令行中执行artifACTS horizon队列监控程序。然而,而在生产环境中,为了监控artifACTS horizon命令的执行情况而配置了一个进程管理工具,以便在其意外退出时能够自动重新启动。当服务器部署新的代码basebase时,需要先终止当前 Horizon主进程,然后可以通过进程管理工具实现重新启动的过程。
在生产环境下使用队列时需要注意以下两个问题:每次部署代码前必须执行artifACT任务名:终止(terminate)后才能重新编译代码。
