Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to accommodate Amazon FIFO SQS in Laravel queue?

Amazon has announced their new FIFO SQS service and I'd like to use it in Laravel Queue to solve some concurrency issues.

I've created several new queues and changed the configurations. However, I got a MissingParameter error which says

The request must contain the parameter MessageGroupId.

So I modified the file vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = [])
{
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);

    return $response->get('MessageId');
}

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $delay = $this->getSeconds($delay);

    return $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
    ])->get('MessageId');
}

I'm using APP_ENV as the group ID (it's a single message queue so actually it doesn't matter a lot. I just want everything to be FIFO).

But I'm still getting the same error message. How could I fix it? Any help would be appreciated.

(btw, where has the SDK defined sendMessage? I can find a stub for it but I didn't find the detailed implementation)

like image 216
Frederick Zhang Avatar asked Nov 22 '16 04:11

Frederick Zhang


People also ask

How do I create a FIFO SQS queue?

To create an Amazon SQS queue (console) Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . Choose Create queue. For Type, the Standard queue type is set by default. To create a FIFO queue, choose FIFO.

Can you convert SQS queue to FIFO?

You can't convert an existing standard queue into a FIFO queue. To make the move, you must either create a new FIFO queue for your application or delete your existing standard queue and recreate it as a FIFO queue.

How do SQS FIFO queues work?

In FIFO queues, messages are ordered based on message group ID. If multiple hosts (or different threads on the same host) send messages with the same message group ID to a FIFO queue, Amazon SQS stores the messages in the order in which they arrive for processing.

How do I know if my SQS queue is FIFO?

To determine whether a queue is FIFO, you can check whether QueueName ends with the . fifo suffix. ContentBasedDeduplication – Returns whether content-based deduplication is enabled for the queue. For more information, see Exactly-once processing in the Amazon SQS Developer Guide.


1 Answers

I want to point out to others who might stumble across the same issue that, although editing SqsQueue.php works, it will easily be reset by a composer install or composer update. An alternative is to implement a new Illuminate\Queue\Connectors\ConnectorInterface for SQS FIFO then add it to Laravel's queue manager.

My approach is as follows:

  1. Create a new SqsFifoQueue class that extends Illuminate\Queue\SqsQueue but supports SQS FIFO.
  2. Create a new SqsFifoConnector class that extends Illuminate\Queue\Connectors\SqsConnector that would establish a connection using SqsFifoQueue.
  3. Create a new SqsFifoServiceProvider that registers the SqsFifoConnector to Laravel's queue manager.
  4. Add SqsFifoServiceProvider to your config/app.php.
  5. Update config/queue.php to use the new SQS FIFO Queue driver.

Example:

  1. Create a new SqsFifoQueue class that extends Illuminate\Queue\SqsQueue but supports SQS FIFO.

    <?php
    
    class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
    {
        public function pushRaw($payload, $queue = null, array $options = [])
        {
            $response = $this->sqs->sendMessage([
                'QueueUrl' => $this->getQueue($queue),
                'MessageBody' => $payload,
                'MessageGroupId' => uniqid(),
                'MessageDeduplicationId' => uniqid(),
            ]);
    
            return $response->get('MessageId');
        }
    }
    
  2. Create a new SqsFifoConnector class that extends Illuminate\Queue\Connectors\SqsConnector that would establish a connection using SqsFifoQueue.

    <?php
    
    use Aws\Sqs\SqsClient;
    use Illuminate\Support\Arr;
    
    class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
    {
        public function connect(array $config)
        {
            $config = $this->getDefaultConfiguration($config);
    
            if ($config['key'] && $config['secret']) {
                $config['credentials'] = Arr::only($config, ['key', 'secret']);
            }
    
            return new SqsFifoQueue(
                new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
            );
        }
    }
    
  3. Create a new SqsFifoServiceProvider that registers the SqsFifoConnector to Laravel's queue manager.

    <?php
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
    {
        public function register()
        {
            $this->app->afterResolving('queue', function ($manager) {
                $manager->addConnector('sqsfifo', function () {
                    return new SqsFifoConnector;
                });
            });
        }
    }
    
  4. Add SqsFifoServiceProvider to your config/app.php.

    <?php
    
    return [
        'providers'     => [
            ...
            SqsFifoServiceProvider::class,
        ],
    ];
    
  5. Update config/queue.php to use the new SQS FIFO Queue driver.

    <?php
    
    return [
    
        'default' => 'sqsfifo',
    
        'connections' => [
            'sqsfifo' => [
                'driver' => 'sqsfifo',
                'key'    => 'my_key'
                'secret' => 'my_secret',
                'queue'  => 'my_queue_url',
                'region' => 'my_sqs_region',
            ],
        ],
    ];
    

Then your queue should now support SQS FIFO Queues.

Shameless plug: While working on the steps above I've created a laravel-sqs-fifo composer package to handle this at https://github.com/maqe/laravel-sqs-fifo.

like image 82
Unnawut Avatar answered Sep 21 '22 14:09

Unnawut