Очереди

Содержание:


Введение

На текущий момент Ларавел предлагает Horizon, красивая панел и система конфигурации для ваших очередей с использованием Redis. Просмотрите документацию Horizon для большей информации.

Очереди Ларавел предоставляют унифицированный API для большого количества различных бэкендов, таких как Beanstalk, Amazon SQS, Redis или даже реляционная база данных. Очереди позволяют вам отложить выполнение задач, занимающих много времени, таких как отправка электронных сообщений, до определённого момента. Это позволяет многократно ускорить обработку текущих запросов в ваше приложение.

Конфигурация очередей находится в файле config/queue.php. В этом файле вы найдёте конфигурацию соединения для каждого из драйверов очереди, которые включены в фреймворк: Beanstalkd, Amazon SQS, Redis и драйвер синхронизации, который мгновенно выполняет задания (для локальной разработки). Также включен драйвер null, который сбрасывает поставленные в очередь задания.

Коннекторы или очереди

Перед началом работы с очередями Ларавел, важно понимать разницу между "коннекторами" и "очередями". В вашем файле конфигурации config/queue.php, есть опция конфигурации connections. Эта опция определяе конктретное соединение с сервисом бэкэнда таким как Amazon SQS, Beanstalk, or Redis. Однако, любая выбранная очередь может иметь несколько соединений, которые можно рассматривать как различные стаки задач очереди.

Важно заметить, что пример конфигурации каждого соединения в файле queue содержит атрибут queue. Это базовая очередь, в которую будут отправляться задания, при отправке их в данное соединение. Другими словами, если вы отправляте задания без точного указания очереди, задание будет добавлено в очередь, указанную в файле конфигурации опции queue:

// This job is sent to the default queue...
Job::dispatch();

// This job is sent to the "emails" queue...
Job::dispatch()->onQueue('emails');

Некоторым приложениям может вообще никогда не потребуется распределять задания по очередям, а будет доступна одна простая очередь. Однако, распределение заданий по разным очередям может быть особенно полезным для приложений, которые хотели бы расставлять приоритеты или сегментировать выполнение заданий. Ларавел предоставляет возможность реализовать задуманное. Например, если вы добавляете задание в очередь high, вы можете запустить процесс и присвоить высокий приоритет:

php artisan queue:work --queue=high,default

Требования и заметка о драйвере

База данных

Чтобы использовать драйвер database, вам потребует таблица базы данных для хранения заданий. Для генерации миграции, которая создаст таблицу, выполните команду Artisan queue:table. После создания миграции, внесите изменения в бд в обычном режиме используя команду migrate:

php artisan queue:table

php artisan migrate

Redis

Чтобы использовать драйвер очереди redis, вам следует настроить соединение с бд Redis в вашем файле конфигурации config/database.php.

Кластер Redis

Если ваше Redis соединение очереди использует кластер Redis, ваши имена очередей должны содержать ключевой хэш-тег. Это требуется для того, чтобы гарантировать, что все ключи Redis для данной очереди помещены в один и тот же хэш-слот:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

Блокировка

При использовании очереди Redis, вы можете использовать опцию конфигурации block_for, чтобы указать, как долго драйвер должен ожидать момента, когда будет возможным возобновить цикл выполнения заданий.

Принятие этого значения в зависимости от загрузки очереди может быть эффективным, чем отправка постоянных запросов в БД Redis за новыми заданиями. Например, вы можете установить значение 5, чтобы обозначить, что драйвер должен заблокировать на 5 секунд до момента, когда задачи станут снова доступными:

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],

Другие условия для драйвера

Следующие зависимости необходимы для перечисленных драйверов очереди:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0

Создание заданий

Генерация классов заданий

По умолчанию, все задания для очереди для вашего приложения находятся в папке app/Jobs. Если папка app/Jobs не существует, она будет создана, когда вы выполните команду Artisan make:job. Можно создать новое задание очереди используя командную строку Artisan:

php artisan make:job ProcessPodcast

Сгенерированный класс будет расширять интерфейс Illuminate\Contracts\Queue\ShouldQueue, показывающий, что задание должно быть добавлено в очередь и выполняться асинхронно.

Структура классов

Классы заданий очень простые, обычно содержащие только метод handle, который вызывается при обработке задания очередью. Для начала, давайте взглянем на пример класса задания. В этом примере, представим, что мы управляем службой загруженных файлов подкаста до момента публикации:

<?php

namespace App\Jobs;

use App\AudioProcessor;
use App\Podcast;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * Create a new job instance.
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * Execute the job.
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // Process uploaded podcast...
    }
}

В этом примере, мы передаём модель Eloquent напрямую в конструктор задания для очереди. Из-за трейта SerializesModels, которую используют задание, модели Eloquent будут изящно сериализованы и десериализованы при обработке задания. Если ваше задание из очереди принимает модель Eloquent в конструкторе, только идентификатор для модели будет сериализовано в очередь. После выполнения задания, система работы с очередью автоматически переполучат полную модель из базы данных. Это полностью прозрачно для вашего приложения и предотвращает проблемы, которые могут позникнуть при сериализации полоной модели Eloquent.

Метод handle вызывает тогда, когда задание проходит обработку очереди. Важно отметить, что мы можем вписывать зависимости в метод задания handle. Сервис контейнер Ларвавел автоматически внедрит эти зависимости.

Если вы хотите иметь полный контроль над тем как контейнер внедряет зависимости в метод handle, вам стоит использовать метод контейнера bindMethod. Метод bindMethod принимает обратную связь, контейнер. Внетри обратной связи вы может вызвать метод handle тогда, когда пожелаете. Обычно, вы должны вызвать этот метод из сервис провайдера:

use App\Jobs\ProcessPodcast;

$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
    return $job->handle($app->make(AudioProcessor::class));
});
Бинарные данные, такие как содержимое изображения, должны проводиться через функцию base64_encode до момента передачи в задание очереди. В противном случае, задание может быть сериализовано не совсем корректно в JSON в момент помещения в очередь.

Посредник для заданий

Промежуточный слой заданий позволяет вам обернуть пользовательскую логику вокруг выполнения заданий очереди, убирает шаблонный код в самих заданиях. Например, предположим, что метод handle запускает ограничение частоты запрос Ларавел Redis, чтобы разрешить обработку лишь одного задания каждые 5 секунд:

/**
 * Execute the job.
 *
 * @return void
 */
public function handle()
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('Lock obtained...');

        // Handle job...
    }, function () {
        // Could not obtain lock...

        return $this->release(5);
    });
}

Может код и является валидным, структура метода handle становится шумной в силу загромождения логикой лимитов Redis. Кроме того, эта логика лимитов должна дублироваться для любого другого задания, на которое мы хотим наложить лимит.

Вместо наложения лимитов в методе handle, можем опредилить средний слой для задания, который и будет накладывать ограничения. У Ларавел нет локации по умолчанию для заданий среднего слоя, поэтому можно располагать такое задание в любом месте вашего приложения. В этом примере, мы расположим в папке app/Jobs/Middleware:

<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * Process the queued job.
     *
     * @param  mixed  $job
     * @param  callable  $next
     * @return mixed
     */
    public function handle($job, $next)
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // Lock obtained...

                    $next($job);
                }, function () use ($job) {
                    // Could not obtain lock...

                    $job->release(5);
                });
    }
}

Как и в случае с средним слоем маршрутов, посредник задания получает обрабатываемое задание и обратную связь, которая необходима для продолжения обработки задания.

После создания посредника задания, его можно прикрепить к заданию путём возвращения задания из метода задания middleware. Этот метод не создаётся через команду Artisan make:job, поэтому вам необходимо добавить его вручную в определение класса:

use App\Jobs\Middleware\RateLimited;

/**
 * Get the middlewarwe the job should pass through.
 *
 * @return array
 */
public function middleware()
{
    return [new RateLimited];
}

Отправка заданий

Как только вы написали ваш класс заданий, вы можете отправить его используя метод dispatch на само же задание. Аргументы, передаваемые в метод dispatch будут отданы в конструктор задания:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // Create podcast...

        ProcessPodcast::dispatch($podcast);
    }
}

Отложенная отправка

Если вы хотите отложит выполнение задания очереди, вы можете использовать метод delay при отправке задания. Например, давайте укажем, что типовое задание не должно быть доступно для обработки до 10 минут после отправки:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // Create podcast...

        ProcessPodcast::dispatch($podcast)
                ->delay(now()->addMinutes(10));
    }
}
Сервис очереди Amazon SQS может иметь максимальную задержку в 15 минут.

Синхронная отправка

Если вы хотите отправить задание в обработку немедленно (синхронно), вы можете использовать метод dispatchNow. При использовании этого метода, задание не будет добавлено в очередь, а начнёт выполняться мгновенно, внутри текущего процесса:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // Create podcast...

        ProcessPodcast::dispatchNow($podcast);
    }
}

Цепочки заданий

Цепочки заданий позволяют вам указывать список заданий очереди, которые должны быть запущены последовательно после успешного выполнения первичного задания. Если одно задание в последовательности проваливается, остальные задания не будут запущены. Для выполнения цепочки заданий очереди вы можете использовать метод withChain на любое из ваших распределяемых заданий:

ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch();
Особое внимание нужно уделить такому моменту, что метод $this->delete(), который удаляет задания, не остановит выполнение цепочки заданий. Цепочка прервётся только если задание будет провалено.

Соединение цепочки и очереди

Если вы хотите указать соединение по умолчанию и очередь, которая должна быть использована для цепочки заданий, вы можете использовать методы allOnConnection и allOnQueue. Эти методы указывают соединение и имя очереди, которые должны быть использованы до момента, когда в задании не будет явно указываться конкретное соединение или очередь:

ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

Настройка очередей и коннекторов

Отправка в определённую очередь

Передавая задания в различные очереди, вы можете таким образом проводить категоризацию ваших заданий очереди, определять число обработчиков, приписанных к различным очередям. Нужно понимать, что такой подход не отправляет задание в различные соединения очереди, как это указано в файле конфигурации, а только в определённые очереди внутри одного соединения. Чтобы указать определённую очередь используйте метод onQueue при отправке задания:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}

Отправка на определённое соединение

Если вы работает с несколькими соединениями очереди, вы можете указать в какое соединение необходимо добавлять задание. Чтобы указать соединение, используйте метод onConnection при отправке задания:

<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * Store a new podcast.
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // Create podcast...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

Можете последовательно использовать методы onConnection и onQueue, чтобы указать соединение и очередь для задания:

ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

Как альтернатива, вы можете указать свойство connection в классе задания:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The queue connection that should handle the job.
     *
     * @var string
     */
    public $connection = 'sqs';
}

Указание максимального числа попыток / установка значений тайм-аута

Максимальное число попыток

Один из подходов, для указания максимального числа, попыток состоит в том, чтобы в командной строке Artisan указавать параметр --tries:

php artisan queue:work --tries=3

Конечно есть и другой подход для указать максимального числа попыток. Можно указывать этот параметры в классе задания. Это значение будет иметь приоритет над значением, указанном в командной строке:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 5;
}

Число попыток, основанное на времени

Как альтернатива, чтобы определить как много попыток можно совершить до провала задания, вы можете определить время задержки. Это позволяет заданию совершить любое количество попыток внутри данного временного промежутка. Для определения времени задержки задания, добавьте метод retryUntil в класс задания:

/**
 * Determine the time at which the job should timeout.
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addSeconds(5);
}
Вы также можете определить метод retryUntil для ваших слушателей событий очереди.

Тайм-аут

Возможность timeout оптимизирована для PHP 7.1+ и расширения PHP pcntl.

Как и все слуачае с указанием максимального числа секунд, время тайм-аута можно указывать используя --timeout в командной строке Artisan:

php artisan queue:work --timeout=30

Однако, вы можете также определить максимальное число секунд, за которое задание можно выполнять в классе самого задания. Указание тайм-аута в задании имеет приоритет над указанием в командной строке:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 120;
}

Ограничение скорости

Эта возможность требует того, чтобы ваше приложение могла взаимодействовать с сервером Redis.

Если ваше приложение взаимодействует с Redis, вы можете указывать задержку заданий по времени или параллелизму. Такая возможность может полезной, когда ваши задания очереди взаимодействуют с API, у которых есть определённые лимиты.

Например, используя метод throttle, мы можете регулировать выполенение данного типа заданий 10 раз каждый 60 секунд. Если не получается наложить запрет на выполнение заданий, тогда обычно задание отправляют обратно в очередь, чтобы повторить попытку через некоторое время:

Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
}, function () {
    // Could not obtain lock...

    return $this->release(10);
});
В примере выше, key может быть строкой, которая точно показывает тип задания, на которое необходимо наложить лимит. Например, вы можете захотеть построить ключ основанный на классе имени и идентификатору модели Eloquent с которыми и работают. Реализация возврата задания обратно в очередь все равно увеличить число попыток attempts.

Как альтернатива, вы можете указать максимальное число обработчиков, которые могут одновременное обрабатывать данное задание. Это может быть полезным, когда задание очереди вносит изменения в ресурс, который должен модифицироваться только одним заданием одновременно. Например, используя метод funnel, вы можете наложить ограничения на данный тип задания, чтобы оно обрабатывалось только одним обработчиком:

Redis::funnel('key')->limit(1)->then(function () {
    // Job logic...
}, function () {
    // Could not obtain lock...

    return $this->release(10);
});
Используя лимиты, число попыток, необходимое для успешного выполнения задания, может быть сложным для определения. Поэтому, может быть полезным комбинировать ограничения по попыткам с попытками, основанных на времени.

Обработка ошибок

Если будет выкинуто исключение во время выполения задания, задание автоматически будет отправлено обратно в очередь, поэтому может быть вызвано для еще одной попытки обработки. Попытки обработать задание до тех пор, пока не достигнет максимального числа, позволенного вашим приложением. Это число максимальных попыток можно указать через --tries для команды Artisan queue:work. Еще число максимальных попыток может быть указано в самом классе задания. Больше информаци о обработчик очереди можно найти тут.


Замыкание очередей

Вместо отправки класса заданий в очередь, вы можете также отправить Замыкание. Это отлично для быстрых, просты заданий, которые необходимо выполнять вне данного цикла запросов:

$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

При отправке Замыкания в очередь, код содержания замыкания криптографически подписывается, поэтому не может быть изменён в пути.


Запуск обработчика очереди

Ларавел включает обработчика очереди, который обрабатывает новые задания по мере добавления в очередь. Вы можете запустить обработчика используя команду Artisan queue:work. Важно отметить, что после запуска команды queue:work, она будет продолжать выполняться до тех пор, пока вы вручную не остановите выполнение или не закроете терминал:

php artisan queue:work
Чтобы держать процесс queue:work в постоянной фоновой обработке, вам следует использовать монитор процесса, такой как Supervisor, чтобы гарантировать, что обработчик очереди не останавливает выполненине.

Запомните, обработчики очереди это долгоживущие процессы, и хранят запущенное приложение в памяти. Как результат, они не заметят изменией, которые вы внесли в код после запуска обработчиков. Поэтому во время разработки убедитесь в том, что перезапустили обработчиков очереди.

Как альтернатива, вы можете запустить команду queue:listen, и вам не придётся вручную перезапускать обработчика каждый раз при изменении кода. Однако, эта команда не такая эффективная как queue:work:

php artisan queue:listen

Указание соединения, очереди.

Вы также можете указать какое соединение очереди обработчик должен использовать. Имя соединения передаваемое в команду work должно соответствовать одному из соединений, указанных в вашем файле конфигурации config/queue.php:

php artisan queue:work redis

Вы можете настроить вашего обработчика очереди ещё точнее обрабатываю только отдельные очереди для данного соединения. Например, если все электронные письма обрабатываются в очереди emails на соединениии очереди redis, вы можете прописать следующую команду, чтобы обработчик обрабатывал только эту очередь:

php artisan queue:work redis --queue=emails

Обработка одиночного задания

Опция --once можно использовать для того, чтобы научить обработчик выполнять обработку только одиночного задания из очереди:

php artisan queue:work --once

Обработка всех заданий очереди и выход

Опцию --stop-when-empty можно использовать, чтобы научить обработчик обрабатывать все задания и элегантно выходить. Эта опция может быть полезна, например, при работе с контейнером Docker. Если вы хотите прекратить работу контейнера после завершения очереди:

php artisan queue:work --stop-when-empty

Размышления о потребляемых ресурсах

Обработчики очереди не "перезапускают" фреймворк перед моментом выполнения каждого задания. Поэтому, вам следует очищать тяжёлые ресурсы после завершения задания. Например, если вы проводите преобразование изображений с библиотекой GD, вы должны очистить память imagedestroy, когда работа завершена.

Приоритеты очередей

Иногда вы можете захотеть расставить приоритеты выполнения ваших очередей. Например, в файле конфигурации config/queue.php вы можете установить базово значение queue для соединения redis как low. При необходимости, вы можете добавить задание в очередь с приоритетом high таким образом:

dispatch((new Job)->onQueue('high'));

Для запуска обработчика, который проводит сверку, что все задания очереди high выполняются до заданий очереди low, передайте список названий очереди через запятую в команду work:

php artisan queue:work --queue=high,low

Рабочие очереди и развёртывание

В силу того, что обработчики это долгоживущие процессы, они не подбирают изменения в вашем коде без рестарта. Поэтому перезагрузка обраработчиков, самый простой способ взаимодействия с обработчиками во время разработки проекта. Перезапустить всех обработчиков можно командой queue:restart:

php artisan queue:restart

Эта команда научит всех обработчиков очереди правильно завершать работу; после выполнения текущего задания так, чтобы не потерять ни одно из существующих заданий. В силу того, что все обработчики умрут после выполнения команды queue:restart, вам следует запустить менеджер процессов такого как Supervisor для автоматического рестарта обработчиков очереди.

Очереди используют кэш для хранения сигналов перезапуска, поэтому вы должны убедиться в том, что кэш-драйвер настроен должным образом для вашего приложения до фактического использования данной возможности.

Истечение срока и тайм-ауты

Истечение срока задания

В вашем файле конфигурации config/queue.php, каждое соединение очереди определяет параметр retry_after. Эта опция указывает как много секунд соединение очереди должно ждать до момента перезапуска задания. Например, если значение retry_after установлено как 90, задание будет отправлено обратно в очередь только если оно обрабатывалось 90 секунд и не было удалено. Обычно, вы должны установить значение code>retry_after как максимальное число секунд, которое может занять обработка задания.

Единственное соединение очереди, которое не содержит значение retry_after, Amazon SQS, которое будет повторять задания основываясь на Базовом времени обработки, которое настраивается внутри AWS консоли.

Тайм-ауты обработчика

У команды Artisan queue:work есть опция тайм-аута --timeout. Эта опция указывает как долго основной процесс будет ждать до момента, когда он уничтожит дочернего обработчика, который обрабатывает задание. Иногда дочерний процесс очереди может стать "замороженым" по целому ряду причин, например, не отвечает вызов внешнего HTTP. Опция --timeout убирает замороженные процессы, которые превысили указанный лимит времени:

php artisan queue:work --timeout=60

Опция конфигурации retry_after и опция командной строки --timeout различаются. Но работают вместе, чтобы убедиться, что задания не потеряны, и что задания успешно выполнены.

Значение --timeout всегда должно быть как минимум на несколько секунд меньше, чем значение конфигурации retry_after. Это гарантирует, что обработчик в процессе выполнения данного задания всегда уничтожается до перезапуска задания. Если значение вашей опции --timeout больше чем опция конфигурации retry_after, тогда ваши задания могут обрабатываться дважды.

Время сна обработчика

Когда задания очереди доступны, обработчик будет продолжать обработку заданий без задержки между ними. Однако, опция sleep определяет как долго (в секундах) обработчик будет "спать", если нет новых доступных заданий. Во время сна, обработчик не выполняет никаких заданий. Задания будут обрабатываться после того, как обработчик возобновит активность снова.

php artisan queue:work --sleep=3

Конфигурация наблюдателя

Установка наблюдателя

Наблюдатель, Супервайзер от англ. Supervisor — это мониторинг процессов для операционной системы Linux, который автоматически перезапустит процесс queue:work в случае его провала. Чтобы установить Supervisor на Ubuntu вам необходимо использовать следующую команду:

sudo apt-get install supervisor
Если настройка наблюдателя самостоятельно кажется слишком сложным, то можно использовать Laravel Forge, который автоматически настроит наблюдателя для ваших проектов.

Настройка наблюдателя

Файлы конфигурации Supervisor обычно хранятся в директории /etc/supervisor/conf.d. Внутри этой папки вы можете создать любое количество файлов конфигурации, которые научат наблодателя как именно должны отслеживаться процессы. Например, давайте создадим файл laravel-worker.conf, который запускает и отслеживает процесс a queue:work:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

В этом примере, директива numprocs научит Supervisor запускать 8 процессов queue:work и следить за ними всеми, автоматически перезапускать их, если они закончатся провалом. Вам необходимо изменить queue:work sqs часть директивы command, чтобы показать нужное соединение очереди.

Запуск наблюдателя.

После создания файла конфигурации, вы можете обновить конфигурацию Supervisor, запустить процесс с помощью следующих команд:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

Для большей информации о Supervisor, обратитесь к документации.


Работа с проваленными заданиями

Иногда ваши задания очереди будут проваливаться. Не нужно волноваться, не всегда все идет по плану. Однако, Ларавел включает удобный указать максимальное число попыток для задания. После исчерпания лимита попыток, будет создана запись в базе данных таблице failed_jobs. Чтобы создать миграцию для таблицы failed_jobs, вы можете использовать команду queue:failed-table:

php artisan queue:failed-table

php artisan migrate

После чего, во время запуска обработчика очереди, вы можете указать максимальное число повторов для задания используя тег --tries для команды queue:work. Если не указать значение, то повтор будет один:

php artisan queue:work redis --tries=3

Дополнительно, вы можете указать как много секунд Ларавел должен ждать до повтора проваленного задания. Для этого необходимо использовать опцию --delay. По умолчанию, задание повторяется мгновенно:

php artisan queue:work redis --tries=3 --delay=3

Если вы хотите настроить задержку повтора проваленных заданий поштучно, то вам следует определить свойство retryAfter для вашего класса заданий очереди:

/**
 * The number of seconds to wait before retrying the job.
 *
 * @var int
 */
public $retryAfter = 3;

Очистка после проваленных заданий

Вы можете определить метод failed напрямую для вашего класса задания, что позволяет вам проводить своеобразную чистку, когда случается провал. Это прекрасное место, чтобы отослать сообщение вашим пользователям или сделать откат выполненных действий заданием. Исключение Exception, которое привело к ошибке, будет передано в метод failed:

<?php

namespace App\Jobs;

use App\AudioProcessor;
use App\Podcast;
use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * Create a new job instance.
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * Execute the job.
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // Process uploaded podcast...
    }

    /**
     * The job failed to process.
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // Send user notification of failure, etc...
    }
}

Проваленные задания для событий

Если вы хотите зарегистрировать событие, которое будет вызываться при провале задания, вы можете использовать метод Queue::failing. Это событие является прекрасной возможностью уведомить вашу команду через электронную почту или мессенджер, такоей как Slack. Например, мы можем прикрепить обратную связь к этому событию из AppServiceProvider, который включён в сборку Ларавел:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * Bootstrap any application services.
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

Повтор проваленных заданий

Для просмотра всех ваших проваленных заданий, которые были добавлены в вашу таблицу базы данных failed_jobs, вы можете использовать команду Artisan:

php artisan queue:failed

Команда queue:failed перечислит доступную информацию о задании: идентификатор, соеднинение, очередь, время неудачи. Идентифкатор задания можно использовать для повтора проваленного задания. Например, чтобы повторить задание с идентификатором 5, выполните следующую команду:

php artisan queue:retry 5

Для повтора всех проваленных заданий, выполните команду queue:retry и передайте all как идентификатор:

php artisan queue:retry all

Если вы хотите удалить проваленное задание, вы можете использовать команду queue:forget:

php artisan queue:forget 5

Для удаления всех ваших проваленных заданий, вы можете использовать команду queue:flush command:

php artisan queue:flush

Работа с потерянными моделями

При внедрении модели Eloquent в задание, перед помещение в очередь, модель подвергается сериализации и восстанавиливается при обработке задания. Однако, если модель была удалена во время ожидания обработки, ваше задание может быть провалено с исключением ModelNotFoundException.

Для удобства, вы можете выбрать автоматическое удаление заданий с потерянными моделями путём установки значения как true для свойства deleteWhenMissingModels:

/**
 * Delete the job if its models no longer exist.
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

События заданий

Используя команду before и after и методы для фасада Queue вы можете указать обратную связь для выполнения до или после выполнения задания очереди. Эта обратная связь является прекрасной возможностью для реализации дополнительного журнала записи или дополнить статистику для панели мониторинга. Обычно, вы должны вызвать эти методы из сервис провайдер. Например, мы можем использовать AppServiceProvider, включённый в Ларавел:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * Bootstrap any application services.
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

Использование метода looping для фасада Queue вы можете указать обратную связь, которая выполняется до попытки обработчика словить задание из очереди. Например, вы можете зарегистрировать Замыкание для отката любых транзакций, которые были открыты предыдущим заданием:

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});