Очереди Laravel — как это работает

Веб-приложения и программное обеспечение чаще всего оцениваются положительно, если они функционируют быстро. Однако некоторые задачи требуют времени, такие как автоматическая отправка электронных писем или создание подробного отчета путем анализа тысяч строк данных.

Встречались ли вы с сайтом, который заставлял вас ждать на экране загрузки, пока он отправлял вам письмо с подтверждением? Вероятно, нет, поскольку большинство веб-приложений, по крайней мере хороших, выполняют долгосрочные задачи асинхронно в фоновом режиме, что и позволяют сделать очереди Laravel (Laravel Queues).

В этой статье вы узнаете, как асинхронно обрабатывать задачи, используя очереди и обработчики в Laravel. Предполагается, что у вас установлены PHP, Composer, MySQL и Redis на вашей системе, и вы знаете, как создать и запустить новый проект на Laravel.

P.S. Лично я использовал Laravel Sail

Введение в задачи, очереди и обработчики

Прежде чем погрузиться в код, я хочу представить вам аналогию из реальной жизни, чтобы объяснить асинхронное выполнение задач. Представьте, что вы владелец местного продуктового магазина, который принимает заказы по телефону и доставляет товары до двери клиента.

Теперь представьте, что вы получили звонок от клиента и приняли новый заказ. Затем вы идете на склад, ищете отдельные продукты, упаковываете их в коробку и отправляете на доставку. При использовании такого подхода вы не можете принимать новые заказы, пока не отправите текущий. Это синхронный подход, который блокирует ваш ввод/вывод до завершения обработки долгосрочной задачи.

Вместо синхронного подхода вы можете нанять дополнительного работника на склад. Таким образом, вы можете принять новый заказ, записать его на листок бумаги (назовем его задачей) и поместить заказ в очередь.

Один из работников склада возьмет одну из задач в очереди, подготовит заказ и отправит его на доставку. Затем работник вернется к очереди, возьмет следующую задачу и начнет ее обрабатывать. Это асинхронный подход. Хотя найм дополнительных работников потребует больше ресурсов, вы сможете обрабатывать больше заказов без блокировки ввода/вывода, что приведет к лучшей производительности и удовлетворенности клиентов.

Теперь давайте посмотрим, как вы могли бы реализовать идею задачи, работника и очереди в Laravel. Надеюсь, вы уже создали новый проект Laravel где-то на вашем компьютере. Откройте проект в вашей любимой среде разработки и обновите код для файла routes/web.php следующим образом:

Route::get('/', function () {

    dispatch(function() {
        sleep(5);
        logger('job done!');
    });

    return view('welcome');
});

Функция-помощник dispatch() в Laravel отправляет новую задачу в очередь. Здесь задачей является обычный PHP-код в виде замыкания или класса. Вы будете использовать замыкание сейчас, но я познакомлю вас с классами задач очень скоро.

Чтобы смоделировать долгосрочную задачу, я вставил явную задержку в пять секунд с помощью функции sleep в замыкании задачи. Затем я использовал функцию-помощник logger(), чтобы напечатать строку "задача выполнена!" в файле журнала проекта.

Запустите проект, выполнив команду php artisan serve или любым другим способом, который вам нравится, и посетите маршрут /. Вы увидите, как браузер "зависает" в состоянии загрузки в течение пяти секунд, а затем появится страница приветствия. Если вы посмотрите файл storage/logs/laravel.log в этот момент, вы увидите там напечатанную строку "задача выполнена!".

[2023-08-10 11:53:00] local.DEBUG: job done!  

Это означает, что задача успешно выполнена после пяти секунд задержки, но возникла проблема. Задача должна была выполняться асинхронно, не блокируя ввод/вывод, но, как вы только что заметили, браузер находился в состоянии загрузки в течение пяти секунд.

Чтобы понять, почему это произошло, откройте файл .env проекта и найдите переменную QUEUE_CONNECTION. Эта переменная указывает на подключение к используемой службе очереди. По умолчанию Laravel устанавливает значение этой переменной как sync, что означает, что фреймворк будет обрабатывать все задачи синхронно.

Системы управления очередями

Для возможности асинхронной обработки задач вам нужно будет использовать другие системы управления. Вы можете получить список предварительно настроенных систем управления в файле config/queue.php:

'connections' => [

    'sync' => [
        'driver' => 'sync',
    ],

    'database' => [
        'driver' => 'database',
        'table' => 'jobs',
        'queue' => 'default',
        'retry_after' => 90,
        'after_commit' => false,
    ],

    'beanstalkd' => [
        'driver' => 'beanstalkd',
        'host' => 'localhost',
        'queue' => 'default',
        'retry_after' => 90,
        'block_for' => 0,
        'after_commit' => false,
    ],

    'sqs' => [
        'driver' => 'sqs',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
        'queue' => env('SQS_QUEUE', 'default'),
        'suffix' => env('SQS_SUFFIX'),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
        'after_commit' => false,
    ],

    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => env('REDIS_QUEUE', 'default'),
        'retry_after' => 90,
        'block_for' => null,
        'after_commit' => false,
    ],

],

Как видите, все проекты Laravel поставляются с пятью предопределенными конфигурациями систем управления. Я расскажу вам о всех из них, но для простоты давайте сначала используем подключение к базе данных. Чтобы сделать это, откройте файл .env проекта и измените значение QUEUE_CONNECTION на database вместо sync и сохраните файл.

QUEUE_CONNECTION=database

Затем выполните команду php artisan queue:table, чтобы сгенерировать скрипт миграции для создания таблицы задач, и проведите миграцию базы данных, выполнив команду php artisan migrate. Эта таблица будет хранить все задачи в очереди, пока обработчик не обработает их. Теперь очистите файл storage/logs/laravel.log и перейдите по маршруту /.

На этот раз вы не увидите задержку в выполнении кода, и страница будет отображена почти мгновенно, что указывает на то, что задача выполняется в фоновом режиме. После ожидания пяти секунд, если вы проверите файл storage/logs/laravel.log, вы обнаружите, что он пуст.

Обработчики очереди

На данном этапе у вас может возникнуть ощущение, что что-то пошло не так, хотя на деле все работает, как и ожидалось. Если вы используете клиентский инструмент (phpmyadmin...) для базы данных и посмотрите на таблицу задач в вашей базе данных, вы увидите, что фреймворк действительно добавил новую задачу в очередь, но нет обработчиков для ее выполнения.

Обработчик очереди — это обычный процесс, который работает в фоновом режиме и опрашивает бэкенд очереди на предмет необработанных задач. Чтобы запустить нового обработчика, выполните команду php artisan queue:work в каталоге вашего проекта. Обработчик запустится и начнет выполнение необработанной задачи немедленно.

INFO  Processing jobs from the [default] queue.  

 2023-08-10 12:28:16 Closure (web.php:18) ............... RUNNING
 2023-08-10 12:28:21 Closure (web.php:18) ............... 5s DONE

Посмотрите в файл storage/logs/laravel.log, и вы увидите, что строка "job done!" снова напечатана. Если вы используете класс задачи вместо замыкания, обработчик будет выводить имя задачи вместо слова closure в выводе. Чтобы в будущем обрабатывать задачи автоматически, вам нужно будет держать обработчика в работе. Я покажу вам, как держать процесс работающим в фоновом режиме, в следующем разделе.

Классы задач и способы отправки

Теперь, когда вы знакомы с основными понятиями, такими как задачи, очереди и обработчики, пришло время создать ваш первый класс задачи. Вы можете сделать это, выполнив команду php artisan make:job <name>. Создайте новую задачу и назовите ее, например, SendVerificationEmail.

artisan make:job SendVerificationEmail

INFO  Job [app/Jobs/SendVerificationEmail.php] created successfully.  

Классы задач в Laravel находятся в каталоге app/Jobs. Откройте файл app/Jobs/SendVerificationEmail.php в вашей IDE.

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Создать новый экземпляр задачи.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Выполнить задачу.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

Каждый класс задачи реализует интерфейс ShouldQueue и содержит конструктор и метод handle(). Класс также по умолчанию включает в себя четыре трейта: Dispatchable, InteractsWithQueue, Queueable и SerializesModels. О них вы узнаете позже.

Как вы уже, возможно, догадались, в методе handle() происходит вся магия. Так что продолжайте и обновите метод handle() следующим образом:

/**
 * Выполнить задачу.
 *
 * @return void
 */
public function handle()
{
    sleep(5);
    logger('email sent!');
}

Теперь, когда вы перенесли код задачи из файла routes/web.php, вам нужно будет обновить маршрут, чтобы использовать класс задачи. Откройте файл routes/web.php и обновите его код следующим образом:

Route::get('/', function () {
    dispatch(new \App\Jobs\SendVerificationEmail());

    return view('welcome');
});

Изменение простое. Вместо передачи замыкания вспомогательному методу dispatch() теперь передается экземпляр класса SendVerificationEmail. Кроме того, вы можете отправить задачу, используя следующий синтаксис:

Route::get('/', function () {
    \App\Jobs\SendVerificationEmail::dispatch();

    return view('welcome');
});

Метод dispatch() в самом классе исходит из трейта Dispatchable, о которой я упоминал ранее. Кроме обычного метода dispatch(), трейт предоставляет несколько других методов, позволяющих обрабатывать задачу разными способами:

  • dispatchIf() — Отправить задачу с заданными аргументами, если пройдена проверка истины.
  • dispatchUnless() — Отправить задачу с заданными аргументами, если проверка истины не пройдена.
  • dispatchSync() — Отправить команду к соответствующему обработчику в текущем процессе.
  • dispatchAfterResponse() — Отправить команду к соответствующему обработчику после текущего процесса.
  • withChain() — Установить задачи, которые должны быть выполнены, если эта задача выполнена успешно.

Эти краткие описания я взял непосредственно из исходного кода трейта. Давайте рассмотрим некоторые из этих методов на практике.

Условия и способы запуска задач

Метод dispatchIf(), например, позволяет отправить задачу на основе условия. Обновите код файла routes/web.php следующим образом:

Route::get('/', function () {
    $userSignedUp = false;

    \App\Jobs\SendVerificationEmail::dispatchIf($userSignedUp);

    return view('welcome');
});

Одну вещь нужно запомнить: каждый раз, когда вы вносите изменения в код вашей задачи, вам необходимо перезапустить обработчик очереди. В противном случае очередь не учтет изменения и выполнит предыдущий код.

Теперь, если вы перейдете на маршрут /, вы не увидите новых запущенных задач. Однако если вы измените значение переменной $userSignedUp на true и перейдете на маршрут /, задача будет выполнена немедленно.

Метод dispatchUnless() работает так же, как и dispatchIf(), но позволяет выполнить задачу только в случае, если данное условие ложно, например, отправлять письмо с подтверждением, если пользователь не зарегистрирован (что является странным условием).

Для тестирования этого метода установите значение переменной $userSignedUp как true и замените строку SendVerificationEmail::dispatchIf($userSignedUp); на SendVerificationEmail::dispatchUnless($userSignedUp);. Сохраните файл, перейдите на маршрут /, и вы не увидите ни одной новой запущенной задачи.

Методы dispatchSync() и dispatchAfterResponse() позволяют выполнить задачу синхронно. Разница между этими методами заключается в том, что dispatchSync() выполняет задачу немедленно, тогда как dispatchAfterResponse() сначала возвращает ответ пользователю, а затем выполняет задачу перед закрытием соединения. Ни один из этих методов не требует, чтобы в фоне работал обработчик, и вы можете использовать их для выполнения задач с коротким временем выполнения.

Метод withChain() позволяет выполнить цепочку дополнительных задач в случае успешного выполнения текущей задачи. Понятие цепочек и пакетов я обсужу в следующем разделе. Возможно, вам покажется сложным запомнить все эти методы, и на самом деле это не обязательно. Вы можете посмотреть исходный код трейта Illuminate\Foundation\Bus\Dispatchable или посетить справочник API Laravel, когда захотите.

Соединения и очереди

Вы уже видели в предыдущем разделе, что Laravel поставляется с несколькими предварительно настроенными бэкенд-соединениями, и до сих пор вы использовали соединение с базой данных. В этом разделе вы узнаете немного больше о других, а также углубитесь в работу с очередями. Давайте вернемся к файлу config/queue.php и посмотрим на доступные конфигурации соединений.

'connections' => [

    'sync' => [
        'driver' => 'sync'
    ],

    'database' => [
        'driver' => 'database',
        'table' => 'jobs',
        'queue' => 'default',
        'retry_after' => 90,
        'after_commit' => false
    ],

    'beanstalkd' => [
        'driver' => 'beanstalkd',
        'host' => 'localhost',
        'queue' => 'default',
        'retry_after' => 90,
        'block_for' => 0,
        'after_commit' => false
    ],

    'sqs' => [
        'driver' => 'sqs',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
        'queue' => env('SQS_QUEUE', 'default'),
        'suffix' => env('SQS_SUFFIX'),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
        'after_commit' => false,
    ],

    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => env('REDIS_QUEUE', 'default'),
        'retry_after' => 90,
        'block_for' => null,
        'after_commit' => false,
    ],

],

Как видите, массив connections содержит индивидуальные конфигурации для различных бэкендов очередей. Эти конфигурации имеют разные параметры, но наиболее часто используемыми являются driverqueue и after_commit.

Люди часто путаются в разнице между драйвером и именем соединения. Имя соединения может быть любым. Необязательно называть его "database", если очередь использует базу данных в качестве бэкенда. Вы можете назвать его как угодно. Драйвер, однако, должен соответствовать одному из имен, предлагаемых фреймворком.

Теперь это значит, что вы ограничены только этими пятью вариантами драйверов? А если вы хотите использовать RabbitMQ в качестве бэкенда очереди? В таких случаях Laravel позволяет использовать сторонние драйверы очередей. Возьмите, например, очень популярный vyuldashev/laravel-queue-rabbitmq. Он позволяет использовать RabbitMQ в качестве бэкенда очереди, как встроенные.

Работа с несколькими соединениями

Я уже объяснил, как установить соединение очереди по умолчанию в файле среды проекта .env с использованием переменной QUEUE_CONNECTION, но что, если вы хотите иметь несколько соединений? Вы также можете это сделать. По умолчанию, когда вы отправляете задачу, фреймворк помещает ее в соединение очереди по умолчанию. Однако, если вы хотите поместить задачу в другое соединение очереди, такое как redis, вы можете использовать метод onConnection().

Route::get('/', function () {
    \App\Jobs\SendVerificationEmail::dispatch()
    ->onConnection('redis');

    return view('welcome');
});

Таким образом, вы можете гибко управлять соединениями и отправкой задач в разные очереди, используя различные соединения. Это может быть полезно в ситуациях, когда у вас есть разные типы задач, которые требуют различных настроек или ресурсов для выполнения, и вы хотите явно указать, в какую очередь должна пойти каждая задача.

Работа с несколькими очередями

Помимо нескольких соединений, могут быть и разные очереди. Опция queue указывает очередь по умолчанию для соединения. Таким образом, если вы отправляете задачу без указания очереди, фреймворк поместит задачу в очередь по умолчанию. Представьте себе сценарий, в котором у вас есть задачи, которые должны быть выполнены до всех остальных, независимо от того, какая из них поступит первой. Для этого обновите маршрут следующим образом:

Route::get('/', function () {
    \App\Jobs\SendVerificationEmail::dispatch();

    \App\Jobs\SendVerificationEmail::dispatch()
    ->onQueue('high-priority');

    return view('welcome');
});

Вы отправляете одну и ту же задачу дважды: в очередь по умолчанию, а затем в очередь с высоким приоритетом. Имя high-priority является вымышленным, и вы можете назвать очередь как угодно. Сначала запустите приложение и перейдите по маршруту /, не запуская рабочих. Затем посмотрите на таблицу базы данных заданий.

Как вы можете видеть, первая задача находится в очереди по умолчанию, а вторая - в очереди с высоким приоритетом. Отправка задачи в очередь с высоким приоритетом не сообщит фреймворку, какую задачу обработать первой. Вы должны изменить строку 'queue' => 'default' для соединения с базой данных на 'queue' => 'high-priority,default' в файле config/queue.php. Теперь запустите рабочего очереди, выполнив команду php artisan queue:work, и рабочий будет приоритизировать очередь, которая идет первой в значении конфигурации.

INFO  Processing jobs from the [high-priority,default] queues.  

  2023-08-10 13:15:37 App\Jobs\SendVerificationEmail ............... RUNNING
  2023-08-10 13:15:42 App\Jobs\SendVerificationEmail ............... 5s DONE
  2023-08-10 13:15:42 App\Jobs\SendVerificationEmail ............... RUNNING
  2023-08-10 13:15:47 App\Jobs\SendVerificationEmail ............... 5s DONE

Действительно, рабочий сначала обрабатывает задачу 2, а затем задачу 1, потому что у нее низкий приоритет. Вместо установки приоритета очереди в файле config/queue.php, вы можете установить его в команде queue:work, написав его как php artisan queue:work --queue="high-priority,default", что даст тот же эффект.

Дополнительные параметры конфигурации

Третьей наиболее распространенной опцией среди всех конфигураций является параметр after_commit. Если вы отправляете задачу в рамках транзакции базы данных, возможно, что в то время как задача выполняется, фреймворк еще не зафиксировал изменения в базе данных. В таких ситуациях, особенно если задача зависит от значений из базы данных, вещи могут выйти из-под контроля.

Установка параметра after_commit в значение true гарантирует, что фреймворк отправит задачу только после фиксации изменений в базе данных. Вы можете установить этот параметр глобально или для каждой задачи, об этом я расскажу вам позже.

Конфигурационная опция retry_after присутствует во всех конфигурациях бэкенда, кроме одной. Amazon SQS использует вместо этого параметры видимости SQS. Установив значение этой опции в 90 секунд, вы сообщаете фреймворку, что если задача не выполнится или не удастся в течение 90 секунд после обработки, фреймворк должен считать ее неудавшейся и попробовать снова. Как правило, вы должны установить это значение в максимальное количество секунд, которое задачи должны занимать для обработки.

Также есть тайм-ауты рабочих, которые вы можете установить, выполнив команду php artisan queue:work --timeout=60. Установка тайм-аута в 60 секунд означает, что если задача не выполнится или не удастся в течение 60 секунд, рабочий выдаст ошибку и завершит работу.

Официальная документация рекомендует установить тайм-аут меньше интервала повторения. В противном случае фреймворк может дважды выполнить вашу задачу, прежде чем вступит в силу тайм-аут рабочего.

Эффекты retry_after и timeout могут быть запутанными, но они становятся яснее, когда вы сталкиваетесь с ними несколько раз. Помимо этих четырех опций конфигурации, остальные в бэкенде sqs и beanstalkd в основном связаны с соединением бэкенда, такими как имя хоста и секретные ключи. Однако я хотел бы подробнее обсудить две опции из соединения redis.

Первая из них - это опция block_for. Если вы установите этот параметр в 5, рабочий сначала откроет соединение с Redis и проверит невыполненные задачи. Если он не найдет их, рабочий будет ждать 5 секунд, прежде чем снова проверить. Эта опция может сэкономить вам немало ценных ресурсов CPU, если вы настроите ее правильно.

Вторая - это опция connection. Видя значение по умолчанию, люди часто путаются, касается ли это множественных соединений очереди или чего-то еще. Чтобы понять эту опцию, сначала откройте файл config/database.php в вашем проекте, прокрутите его до конца и найдите код, похожий на следующий:

'redis' => [

    'client' => env('REDIS_CLIENT', 'phpredis')

    'options' => [
        'cluster' => env('REDIS_CLUSTER', 'redis'),
        'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_')
    ],

    'default' => [
        'url' => env('REDIS_URL'),
        'host' => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', null),
        'port' => env('REDIS_PORT', '6379'),
        'database' => env('REDIS_DB', '0')
    ],

    'cache' => [
        'url' => env('REDIS_URL'),
        'host' => env('REDIS_HOST', '127.0.0.1'),
        'password' => env('REDIS_PASSWORD', null),
        'port' => env('REDIS_PORT', '6379'),
        'database' => env('REDIS_CACHE_DB', '1')
    ]

]

Как видите, здесь есть две отдельные конфигурации соединения с Redis. Одно является соединением по умолчанию, а другое - соединением кэша. По умолчанию фреймворк использует соединение по умолчанию в качестве бэкенда очереди, но вы можете определить новые здесь, если захотите. Таким образом, опция connection в разделе redis внутри файла config/queue.php относится к соединению с базой данных по умолчанию для Redis.

Сбои заданий и повторные попытки выполнения

До сих пор вы работали с заданиями, которые выполнялись успешно, но это не всегда будет так. Как и другой код, задания могут сбоить, и в зависимости от ваших требований, вы можете либо пытаться выполнить задание снова, либо игнорировать его. Задание может сбоить из-за тайм-аута или исключения. Давайте сначала рассмотрим тайм-ауты.

Чтобы продемонстрировать это, вернитесь к файлу app/Jobs/SendVerificationEmail.php и обновите его код следующим образом:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $timeout = 1;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        sleep(5);
        logger('email sent!');
    }
}

Публичное свойство timeout указывает, сколько времени рабочий процесс очереди должен дать заданию на выполнение. Установка тайм-аута задания в 1 указывает рабочему процессу, что это задание не должно выполняться дольше 1 секунды.

В методе handle() я оставил выражение sleep() без изменений. Теоретически рабочий процесс завершит задание через 1 секунду и пометит его как сбойное. Перезапустите рабочий процесс очереди (php artisan queue:work) и перейдите на маршрут /. Вы должны увидеть что-то похожее на следующее:

INFO  Processing jobs from the [default] queue.  

 2023-08-11 05:09:36 App\Jobs\SendVerificationEmail .............. RUNNING
 2023-08-11 05:09:37 App\Jobs\SendVerificationEmail .............. 997.44ms FAIL

Последняя строка в выводе указывает, что рабочий процесс очереди завершился. Я надеюсь, что вы уже поняли, что установка $timeout в 1 внутри задания аналогична его установке через команду queue:work --timeout=1. В производственной среде менеджер процессов, такой как supervisor, будет отвечать за перезапуск рабочего процесса очереди.

Еще одним способом сбоя задания может быть простое вызов исключения. Откройте файл app/Jobs/SendVerificationEmail.php и обновите его содержимое следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception();

        sleep(5);
        logger('email sent!');
    }
}

Выражение throw будет имитировать типичный сбой выполнения кода. Я также убрал тайм-аут. Если вы перезапустите рабочий процесс очереди и снова зайдете на маршрут /, вы увидите следующий вывод от рабочего процесса:

INFO  Processing jobs from the [default] queue.  

 2023-08-11 05:13:48 App\Jobs\SendVerificationEmail ............... RUNNING
 2023-08-11 05:13:48 App\Jobs\SendVerificationEmail ............... 13.45ms FAIL

Рабочий процесс попытался выполнить задание и пометил его как сбойное, как только было вызвано исключение. Возможно, вы задаетесь вопросом, куда уходят эти сбойные задания? Они сохраняются в базе данных. Если вы посмотрите на свою базу данных, увидите таблицу под названием failed_jobs, которая отвечает за хранение всех заданий, которые по тем или иным причинам не были выполнены успешно.

Таблица хранит важную информацию, такую как подключение, очередь, полезная нагрузка, исключение и время сбоя. Вместо того, чтобы изучать базу данных напрямую, вы также можете использовать команду php artisan queue:failed, чтобы получить список всех сбойных заданий.

2023-08-11 05:13:48 35ee7ede-ebf7-444d-b4ec-c31ad433ea78 ............... database@default App\Jobs\SendVerificationEmail  
2023-08-11 05:09:37 8afc4974-8a1c-4a98-b7f8-dfe49ebc392a ............... database@default App\Jobs\SendVerificationEmail  

Вы можете повторно пытаться выполнить задания, сохраненные в этой таблице, вручную или автоматически. Чтобы повторно пытаться выполнить задание вручную, используйте команду php artisan queue:retry. Команда принимает идентификатор задания в качестве аргумента.

php artisan queue:retry 35ee7ede-ebf7-444d-b4ec-c31ad433ea78

Фреймворк вернул задание обратно в очередь. Запустите процесс обработки очереди, и он немедленно обработает задание. Задание снова завершится сбоем, потому что вы запрограммировали его так, чтобы оно всегда сбоило. Даже если вы уберете значение $timeout, задание запомнит старую конфигурацию и повторит ее.

Команда queue:retry имеет также некоторые альтернативные синтаксисы. Она может принимать несколько идентификаторов заданий или даже диапазон идентификаторов.

Вы даже можете повторить все задания одновременно, выполнив команду:

php artisan queue:retry --all

Или, если вы хотите повторно пытаться выполнить сбойные задания из определенной очереди, можете выполнить команду:

php artisan queue:retry --queue=high-priority

Вы даже можете удалить сбойные задания из базы данных, выполнив команду:

php artisan queue:forget

Как и команда queue:retry, эта команда также принимает идентификатор задания в качестве аргумента.

Автоматические повторы

Мне известно, что повторное выполнение заданий с использованием команды artisan в рабочем окружении кажется странным, и я лично никогда так не делал. Вы либо будете использовать графический интерфейс (о котором я расскажу в следующем разделе), либо настроите задания на автоматический повтор. Чтобы задание автоматически повторялось, установите публичное свойство $tries в классе задания. Обновите содержимое файла app/Jobs/SendVerificationEmail.php следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $tries = 5;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception();

        sleep(5);
        logger('email sent!');
    }
}

Теперь перезапустите обработчик очереди и перейдите на маршрут /. Вы увидите, что вместо того, чтобы считать задание неудачным после первого исключения, обработчик автоматически повторил его пять раз.

INFO Processing jobs from the [default] queue. 

 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ............... RUNNING
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ............... 16.23ms FAIL
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ............... RUNNING
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ................ 6.49ms FAIL
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ............... RUNNING
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ................ 3.87ms FAIL
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ............... RUNNING
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ................ 3.76ms FAIL
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ............... RUNNING
 2023-08-11 05:49:21 App\Jobs\SendVerificationEmail ................ 4.01ms FAIL

Как видите по выводу, обработчик пометил задание как неудачное только после пятого повтора. Если вы установите значение $tries = -1, обработчик будет повторять задание неограниченное количество раз. Кроме установки количества повторов, вы также можете настроить задание так, чтобы обработчик повторял его определенное количество раз. Обновите содержимое файла app/Jobs/SendVerificationEmail.php следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $tries = -1;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception();

        sleep(5);
        logger('email sent!');
    }

    public function retryUntil()
    {
        return now()->addSeconds(2);
    }
}

Метод retryUntil() возвращает количество раз, сколько времени нужно повторять задание. Перезапустите обработчик очереди и снова зайдите на маршрут /. На этот раз вы увидите, что обработчик повторяет задание всего 2 секунды, а затем объявляет его неудачным.

Интервал автоматического повтора

Вы могли подумать, что 2 секунды — это очень короткий промежуток времени, но за это время обработчик попробовал выполнить задание 168 раз на моем локальном компьютере. Возможно, вы не хотите, чтобы это произошло. Возможно, вы хотите, чтобы обработчик ждал 1 секунду перед повторным выполнением задания. Публичное свойство $backoff делает именно это. Чтобы увидеть это в действии, обновите файл app/Jobs/SendVerificationEmail.php следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $tries = -1;
    public $backoff = 1;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception();

        sleep(5);
        logger('email sent!');
    }

    public function retryUntil()
    {
        return now()->addSeconds(30);
    }
}

Перезапустите обработчик очереди и снова посетите маршрут /. На этот раз обработчик будет ждать 1 секунду перед обработкой задания. За 30 секунд обработчик повторит задание только 10 раз.

Свойство $backoff является одним из самых интересных. Предположим, вы хотите, чтобы задание было выполнено 5 раз, и хотите, чтобы обработчик ждал 1 секунду перед первой попыткой, 2 секунды перед второй попыткой, 3 секунды перед третьей попыткой, 4 секунды перед четвертой попыткой и 5 секунд перед пятой попыткой. Чтобы сделать это, обновите файл app/Jobs/SendVerificationEmail.php следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $tries = 5;
    public $backoff = [1, 2, 3, 4, 5];

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception();

        sleep(5);
        logger('email sent!');
    }
}

Думаю, вы можете понять, что означают значения $tries и $backoff. Перезапустите обработчик очереди и снова посетите маршрут /. На этот раз вы увидите, что время задержки увеличивается с каждой попыткой. Теперь, если вы установите $tries в 10 и оставите $backoff без изменений, последняя длительность будет сохраняться для остальных попыток. Таким образом, после пятой попытки остальные попытки будут иметь 5-секундную задержку.

Обработка сбоев(исключений) в заданиях

Есть ещё два концепта, которые я хотел бы обсудить касательно ошибок заданий и повторных попыток. Первый из них — это публичный метод failed(). Вы можете использовать эту функцию для выполнения части кода после того, как задание завершилось неудачей. Это что-то вроде оператора catch для заданий. Второй — это максимальное число разрешенных исключений. Вы можете добиться довольно интересных результатов, сочетая эти методы и свойства.

Рассмотрим сценарий, в котором вы хотите вернуть неудачное задание обратно в очередь на основании определенного условия, но также хотите ограничить количество разрешенных исключений. Чтобы увидеть это в действии, обновите содержимое файла app/Jobs/SendVerificationEmail.php следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class SendVerificationEmail implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public $maxExceptions = 5;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception('Failed!', 999);

        sleep(5);
        logger('email sent!');
    }

    public function failed(Throwable $th)
    {
        if ($th->getCode() == 999) {
            $this->release();
        } else {
            logger($th->getMessage());
        }
    }
}

Внутри метода failed(), я возвращаю задание обратно в очередь, если оно столкнулось с кодом ошибки 999. Метод release() происходит из контракта InteractsWithQueue, и код ошибки 999 является вымышленным. Я также установил максимальное количество разрешенных исключений равное 5, поэтому, если задание выдает 5 подряд исключений, обработчик будет считать его неудачным.

Я мог бы использовать свойство $tries здесь, но в этом случае обработчик попытался бы выполнить задание вне зависимости от кода исключения. Если вы используете $tries и $maxExceptions в одном задании, свойство $maxExceptions переопределит свойство $tries. Перезапустите обработчик очереди и снова посетите маршрут /. Вы увидите, что задание будет повторено 5 раз и объявлено неудачным.

Группировка заданий: цепочки и пакеты

До этого момента в данной статье вы работали с отдельными заданиями, но задания также могут быть упорядочены в цепочки или пакеты. Оба эти варианта представляют собой группы заданий. Основное отличие между пакетом и цепочкой заключается в том, что задания в пакете обрабатываются одновременно, тогда как задания в цепочке обрабатываются последовательно.

Цепочки

В цепочке задача зависит от предыдущей. Таким образом, если вы создадите цепочку из трех заданий, обработчик выполнит их последовательно; если любое из заданий завершится сбоем, последующие задания обработаны не будут. Чтобы увидеть это в действии, создайте три новых задания внутри проекта. Для этого выполните следующие команды в каталоге вашего проекта:

php artisan make:job EncodeVideoClip
php artisan make:job GenerateSubtitles
php artisan make:job PublishVideoClip

Теперь откройте файл routes/web.php и обновите код обработчика маршрута /:

use App\Jobs\EncodeVideoClip;
use App\Jobs\GenerateSubtitles;
use App\Jobs\PublishVideoClip;

Route::get('/', function () {

    \Illuminate\Support\Facades\Bus::chain([
        new EncodeVideoClip,
        new GenerateSubtitles,
        new PublishVideoClip,
    ])->dispatch();

    return view('welcome');
});

Этот вымышленный фрагмент кода может быть частью платформы для обмена видео, и он описывает необходимые шаги для публикации нового видеоролика на платформе. В процессе публикации три шага:

  1. Кодировать видеоролик в оптимизированный формат.
  2. Генерировать субтитры, анализируя аудиопоток.
  3. Опубликовать видео на платформе.

Эти три шага обрабатываются в трех отдельных заданиях на сервере, и если одно из них завершится сбоем, остальные задания не будут обработаны. Метод chain() в фасаде Illuminate\Support\Facades\Bus возвращает экземпляр класса Illuminate\Foundation\Bus\PendingChain, который похож на массив из нескольких заданий и включает в себя методы, такие как метод dispatch(). Чтобы увидеть это в действии, запустите нового рабочего в очереди и перейдите по маршруту /:

INFO  Processing jobs from the [default] queue.  

 2023-08-11 06:15:58 App\Jobs\EncodeVideoClip .................. RUNNING
 2023-08-11 06:15:58 App\Jobs\EncodeVideoClip .................. 60.49ms DONE
 2023-08-11 06:15:58 App\Jobs\GenerateSubtitles ................ RUNNING
 2023-08-11 06:15:58 App\Jobs\GenerateSubtitles ................ 4.92ms DONE
 2023-08-11 06:15:58 App\Jobs\PublishVideoClip ................. RUNNING
 2023-08-11 06:15:58 App\Jobs\PublishVideoClip ................. 2.74ms DONE

Как вы можете видеть из вывода, обработчик действительно обработал три задания последовательно. Теперь откройте файл app/Jobs/GenerateSubtitles.php и обновите его код следующим образом:

<?php

namespace App\Jobs;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class GenerateSubtitles implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    // ...

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        throw new Exception();
    }
}

Генерация исключения во втором задании теоретически должна остановить обработчик от выполнения последующих заданий в очереди. Перезапустите рабочего в очереди и перейдите по маршруту /:

INFO  Processing jobs from the [default] queue.  

 2023-08-11 06:19:33 App\Jobs\EncodeVideoClip .................... RUNNING
 2023-08-11 06:19:33 App\Jobs\EncodeVideoClip .................... 40.96ms DONE
 2023-08-11 06:19:33 App\Jobs\GenerateSubtitles .................. RUNNING
 2023-08-11 06:19:33 App\Jobs\GenerateSubtitles .................. 3.20ms FAIL

Как видите, обработчик остановился, как только столкнулся с исключением во втором задании. Класс Illuminate\Foundation\Bus\PendingChain также включает метод chain() для обработки сбоев в цепочках заданий. Откройте файл routes/web.php снова и обновите код обработчика маршрута /:

use App\Jobs\EncodeVideoClip;
use App\Jobs\GenerateSubtitles;
use App\Jobs\PublishVideoClip;

Route::get('/', function () {

    \Illuminate\Support\Facades\Bus::chain([
        new EncodeVideoClip,
        new GenerateSubtitles,
        new PublishVideoClip,
    ])->catch(function (Throwable $th) {
        Log::error('The job failed with code: ' . $th->getCode());
    })
    ->dispatch();

    return view('welcome');
});

Теперь посмотрите в файл storage/logs/laravel.log, и вы должны увидеть что-то вроде следующего кода:

[2023-08-11 06:23:00] local.ERROR: The job failed with code: 0  
[2023-08-11 06:23:00] local.ERROR:  {"exception":"[object] (Exception(code: 0):  at /var/www/html/app/Jobs/GenerateSubtitles.php:26)
[stacktrace]
#...........

Конечно, в реальной жизни у вас будет гораздо более сложный механизм обработки ошибок, но на данный момент я надеюсь, что вы поняли идею. Вы также можете переопределить очередь или соединение по умолчанию, вызвав методы onQueue() или onConnection().

use App\Jobs\EncodeVideoClip;
use App\Jobs\GenerateSubtitles;
use App\Jobs\PublishVideoClip;

Route::get('/', function () {

    \Illuminate\Support\Facades\Bus::chain([
        new EncodeVideoClip,
        new GenerateSubtitles,
        new PublishVideoClip,
    ])->onConnection('database')
    ->onQueue('video-processing')
    ->catch(function (Throwable $th) {
        Log::error('The job failed with code: ' . $th->getCode());
    })
    ->dispatch();

    return view('welcome');
});

Пакеты задач

Помимо цепочек задач, существуют также пакеты задач. Пакеты похожи на цепочки, но они представляют собой коллекцию независимых друг от друга задач, и рабочий процесс обрабатывает пакет задач одновременно. Чтобы увидеть это в действии, откройте файл routes/web.php и обновите код обработчика маршрута /:

use App\Jobs\EncodeVideoClip;

Route::get('/', function () {
    \Illuminate\Support\Facades\Bus::batch([
        new EncodeVideoClip,
        new EncodeVideoClip,
        new EncodeVideoClip,
    ])->dispatch();

    return view('welcome');
});

Как и метод chain(), метод batch() возвращает экземпляр класса Illuminate\Bus\PendingBatch, который включает в себя множество полезных методов для работы с пакетами задач. Прежде чем вы увидите это в действии, вам нужно будет создать таблицу job_batches в вашей базе данных. Для этого выполните следующие команды:

php artisan queue:batches-table
php artisan migrate

Как название подразумевает, эта таблица хранит информацию о пакетах задач. Помимо создания этой таблицы, вы также должны добавить черту Illuminate\Bus\Batchable к каждой задаче внутри пакета. Итак, откройте файл app/Jobs/EncodeVideoClip.php и обновите его код следующим образом:

<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class EncodeVideoClip implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    // ...
}

Теперь перезапустите обработчик очереди. Убедитесь, что окно терминала, в котором работает обработчик, видно, и снова посетите маршрут /. Вы увидите все три задачи одновременно на терминале. Если вы посмотрите на таблицу job_batches с помощью инструмента базы данных, вы увидите информацию о пакете задач в таблице.

Как вы видите, она содержит информацию, такую как общее количество задач, количество ожидающих задач, количество неудачных задач и многое другое. Я уже сказал, что задачи в пакете не зависят друг от друга. Однако, если любая из задач в пакете завершится неудачей, рабочий процесс объявит весь пакет отмененным. Вы можете переопределить это поведение по умолчанию, вызвав метод allowFailures() перед отправкой задачи.

use \App\Jobs\EncodeVideoClip;

Route::get('/', function () {
    \Illuminate\Support\Facades\Bus::batch([
        new EncodeVideoClip,
        new EncodeVideoClip,
        new EncodeVideoClip,
    ])->allowFailures()
    ->dispatch();

    return view('welcome');
});

Помимо этого метода, класс Illuminate\Bus\PendingBatch также включает в себя обычные методы, такие как then(), catch() и finally().

use App\Jobs\EncodeVideoClip;

Route::get('/', function () {
    \Illuminate\Support\Facades\Bus::batch([
        new EncodeVideoClip,
        new EncodeVideoClip,
        new EncodeVideoClip,
    ])->allowFailures()
    ->onConnection('database')
    ->onQueue('video-encoding')
    ->then(function(\Illuminate\Bus\Batch $batch) {
        logger('Batch ' . $batch->id . ' finished successfully!');
    })
    ->then(function(\Illuminate\Bus\Batch $batch) {
        logger('Batch ' . $batch->id . ' did not finish successfully!');
    })
    ->finally(function(\Illuminate\Bus\Batch $batch) {
        logger('Cleaning leftovers from batch ' . $batch->id);
    })
    ->dispatch();

    return view('welcome');
});

Метод then() запускается только в случае успешной обработки пакета. Метод catch() запускается, если произошла ошибка, а метод finally() запускается независимо от ошибки. Вы можете использовать комбинацию этих трех методов для создания довольно сложных рабочих процессов. Я также заметил, что в случае пакетов существуют методы onConnection() и onQueue().

Есть еще одна интересная вещь, которую вы можете сделать с цепочками и пакетами: отправлять пакеты цепочек. Я знаю, это звучит сложно, но вынесите со мной. Откройте файл routes/web.php и обновите код обработчика маршрута /:

use App\Jobs\EncodeVideoClip;
use App\Jobs\GenerateSubtitles;
use App\Jobs\PublishVideoClip;

Route::get('/', function () {
    \Illuminate\Support\Facades\Bus::batch([
        [
            new EncodeVideoClip,
            new GenerateSubtitles,
            new PublishVideoClip,
        ],
        [
            new EncodeVideoClip,
            new GenerateSubtitles,
            new PublishVideoClip,
        ],
    ])->dispatch();

    return view('welcome');
});

В этом случае вам придется поместить цепочки в массив пакетов и передать их методу batch() в качестве параметра.

Удалите исключение из файла app/Jobs/GenerateSubtitles.php, не забудьте добавить трейт Illuminate\Bus\Batchable ко всем трем задачам, перезапустите обработчик очередей.

Главное, что следует здесь понимать, это то, что пакеты являются гибким и мощным инструментом, который позволяет группировать задачи вместе и обрабатывать их одновременно, независимо от их порядка или зависимостей.

Пакеты могут быть очень мощным инструментом, но как и всё в программировании, их следует использовать осторожно и с пониманием. Если использовать их неправильно, пакеты могут привести к неожиданным результатам или даже нарушить работу вашего приложения. Но если вы понимаете их функцию и как их правильно реализовать, они могут стать настоящим достоянием вашего приложения на Laravel.

У вас есть вопросы по использованию пакетов или нужно что-то ещё прояснить? Не стесняйтесь спрашивать!

Подводя итоги

  1. Пакеты заданий (Batches): Пакеты представляют собой группировку заданий, которые обрабатываются параллельно, в отличие от цепочек, где задания обрабатываются последовательно.
  2. Создание таблицы для пакетов: Для работы с пакетами необходимо создать специальную таблицу в базе данных. В статье приведены команды для выполнения этого.
  3. Использование Trait Batchable: Для каждого задания в пакете следует использовать специальный trait Batchable.
  4. Методы для работы с пакетами: В статье описаны различные методы для работы с пакетами, такие как allowFailures()then()catch(), и finally(), которые можно использовать для создания сложных рабочих процессов.
  5. Пакеты цепочек и цепочки пакетов: Можно создавать пакеты из цепочек заданий и наоборот, организовывая еще более сложные структуры обработки заданий.
  6. Осторожность в использовании: В конце текста подчеркивается, что хотя пакеты являются мощным инструментом, они могут привести к неожиданным результатам при неправильном использовании.
  7. Интеграция с Laravel: Весь текст касается специфики работы с пакетами в рамках фреймворка Laravel, что делает его особенно актуальным для разработчиков, работающих с этим инструментом.

Эта статья может служить полезным руководством для тех, кто хочет глубже изучить механизмы параллельной обработки заданий в Laravel, и предоставляет конкретные примеры кода для различных сценариев использования.

Написать комментарий