Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Laravel Queues/Horizon - Distribute jobs evenly across multiple worker servers

Overview

I have a Laravel 9.x application which has a multiple different job classes that run on Horizon. These jobs are very resource intensive and so I am exploring different approaches on to how to scale my queue worker force horizontally. Currently, I have 3 dedicated worker servers that run Horizon and are all connected to the same central Redis instance to poll for jobs.

My Problem

Say I have a job class called ProcessDocumentJob. I also have 3 instances of Horizon running per my 3 worker servers. All 3 worker servers are listening to the same queue, document-processing. My issue is that if I where to dispatch the ProcessDocumentJob 3 times to the document-processing queue, there is no guarantee that all 3 of my worker servers would be utilized for maximum performance. What happens most of the time is one of the three worker servers picks up all three jobs and tries to process them while the other two worker servers are idle. What I would like to happen is for the jobs to be distributed evenly (load balanced via a round robin approach) across all my worker servers.

My Current Solution

My current solution to distribute jobs evenly across my worker servers is to set a WORKER_QUEUE env var specific to each server. So for my 3 worker servers there would be 3 different queues I would deploy the ProcessDocumentJob to (worker-queue-1, worker-queue-2, worker-queue-3). Additionally, I'm also using the HORIZON_ENV env var (set to "worker") to specify the correct environment.

Here's a snippet of what my horizon.php config file looks like for this:

'environments' => [
        'worker' => [
            'worker-supervisor' => [
                'connection' => 'redis',
                'queue' => [env('WORKER_QUEUE')],
                'balance' => 'simple',
                'maxProcesses' => 10,
                'maxJobs' => 0,
                'memory' => 512,
                'tries' => 3,
                'nice' => 0,
                'timeout' => 600,
            ],
        ],

The constructor of my ProcessDocumentJob job class looks like this:

public function __construct(Document $document)
{
    $this->onQueue(Worker::bestPick());

    $this->document = $document;
}

The Worker::bestPick() method is my approach to a round robin equivalent of dispatching jobs. It knows how many worker servers are available, each worker server's corresponding queue name and which queue was last used so that when another ProcessDocumentJob job comes in, it knows which server worker queue is next in the chain to assign it too.

The Drawback

This solution works well for the most part and all my jobs are distributed evenly across my worker servers. However, if one of my three worker servers goes down, this solution would not work unless I implemented some sort of uptime check into my Worker::bestPick() method.

Additionally, I've also explored Redis's atomic locks and limiting the maxJobs field in my config file in an attempt to achieve what I'm after but had no success.

Questions

  • How do I distribute jobs evenly across all my worker servers similar to how a load balancer would distribute traffic via Round Robin to web servers?
  • Am I on the right path with attempting to horizontally scale my worker servers with peak performance in mind? Or is there a more Laravel friendly way of handling this?

And yes, I know these things

  • Queue systems are first-in, first-out.
  • The balance field can be set to "simple", "auto" and "false" and these values do not appear to help solve my problem.
  • Yes, I've read the Laravel docs on Horizon and Queues.

TL;DR

I get that time is money... but don't be lazy. Read the whole thing ^

like image 673
Denis Priebe Avatar asked Nov 27 '25 04:11

Denis Priebe


1 Answers

This is my current solution to the problem I described. In short, I have Worker class which is responsible for determining what queue/worker should receive the next job:

<?php

namespace App\Domain\Docs\Pdf\Worker;

use Exception;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Str;
use Laravel\Horizon\Contracts\WorkloadRepository;
use Throwable;

class Worker
{
    /**
     * Get a list of all available, running "worker" queues in Horizon.
     */
    public static function queues(): array
    {
        $horizonWorkload = app(WorkloadRepository::class);

        try {
            $queues = $horizonWorkload->get();
        } catch (Exception|Throwable $exception) {
            $queues = [];
        }

        // Target all queues with this prefix...
        $prefix = 'worker-';

        return collect($queues)
            ->filter(fn (array $queue) => Str::startsWith($queue['name'], $prefix))
            ->sortBy('name')
            ->values()
            ->toArray();
    }

    /**
     * Determines the next worker that should receive a job.
     *
     * This current implementation uses the round-robin approach which keeps track
     * of the last worker that received a job and returns the name of the next
     * worker that is in line for the next job.
     */
    public static function bestPick(): string
    {
        $workers = static::queues();

        foreach ($workers as $key => $worker) {
            if ($worker['name'] === Cache::get('last-worker-used')) {
                if (array_key_exists(($key + 1), $workers)) {
                    Cache::put('last-worker-used', $workers[($key + 1)]['name'], now()->addMonth());

                    return $workers[($key + 1)]['name'];
                }
            }
        }

        $fallbackWorker = Arr::get($workers, '0.name', 'worker-1');

        Cache::put('last-worker-used', $fallbackWorker, now()->addMonth());

        return $fallbackWorker;
    }
}

Then in your job(s) class you can do the following:

<?php

namespace App\Domain\Docs\Pdf\Energizer;

use App\Domain\Docs\Pdf\Worker\Worker;
use App\Models\Pdf\Pdf;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPdfJob implements ShouldQueue
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;
    use SerializesModels;

    /**
     * The PDF we are processing.
     */
    protected Pdf $pdf;

    /**
     * Create a new processor instance for the PDF.
     */
    public function __construct(Pdf $pdf)
    {
        /**
         * This is a resource heavy job. Let's make sure that it is
         * processed evenly across all of our worker servers...
         */
        $this->onQueue(Worker::bestPick());

        $this->pdf = $pdf;
    }

    /**
     * Run the job.
     */
    public function handle(): void
    {
        // Job logic here...
    }
}

With this approach, the ProcessPdfJob will always be distributed evenly across multiple servers that are running Horizon. The Worker class I defined currently only supports the 'auto' balance strategy:

'worker' => [
    'worker-supervisor' => [
        'connection' => 'redis',
        'queue' => [env('WORKER_QUEUE')],
        'balance' => 'auto',
        'maxProcesses' => 10,
        'maxJobs' => 0,
        'memory' => 512,
        'tries' => 3,
        'nice' => env('WORKER_NICE', 0),
        'timeout' => 600,
        'sleep' => env('WORKER_SLEEP', 3),
    ],
],

Horizon Workload

As you can see in my Horizon Workload screenshot, I have three "worker" servers that all the logic I outlined applies to. My approach currently does not factor in server CPU usage or the current amount of jobs the server is running but I believe the WorkloadRepository class provides some of that data so modifying my Worker class to take that into account shouldn't be too difficult.

like image 149
Denis Priebe Avatar answered Nov 29 '25 19:11

Denis Priebe



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!