Amazon has announced their new FIFO SQS service and I'd like to use it in Laravel Queue to solve some concurrency issues.
I've created several new queues and changed the configurations. However, I got a MissingParameter
error which says
The request must contain the parameter MessageGroupId.
So I modified the file vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);
return $response->get('MessageId');
}
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
])->get('MessageId');
}
I'm using APP_ENV
as the group ID (it's a single message queue so actually it doesn't matter a lot. I just want everything to be FIFO).
But I'm still getting the same error message. How could I fix it? Any help would be appreciated.
(btw, where has the SDK defined sendMessage
? I can find a stub for it but I didn't find the detailed implementation)
To create an Amazon SQS queue (console) Open the Amazon SQS console at https://console.aws.amazon.com/sqs/ . Choose Create queue. For Type, the Standard queue type is set by default. To create a FIFO queue, choose FIFO.
You can't convert an existing standard queue into a FIFO queue. To make the move, you must either create a new FIFO queue for your application or delete your existing standard queue and recreate it as a FIFO queue.
In FIFO queues, messages are ordered based on message group ID. If multiple hosts (or different threads on the same host) send messages with the same message group ID to a FIFO queue, Amazon SQS stores the messages in the order in which they arrive for processing.
To determine whether a queue is FIFO, you can check whether QueueName ends with the . fifo suffix. ContentBasedDeduplication – Returns whether content-based deduplication is enabled for the queue. For more information, see Exactly-once processing in the Amazon SQS Developer Guide.
I want to point out to others who might stumble across the same issue that, although editing SqsQueue.php
works, it will easily be reset by a composer install
or composer update
. An alternative is to implement a new Illuminate\Queue\Connectors\ConnectorInterface
for SQS FIFO then add it to Laravel's queue manager.
My approach is as follows:
SqsFifoQueue
class that extends Illuminate\Queue\SqsQueue
but supports SQS FIFO.SqsFifoConnector
class that extends Illuminate\Queue\Connectors\SqsConnector
that would establish a connection using SqsFifoQueue
.SqsFifoServiceProvider
that registers the SqsFifoConnector
to Laravel's queue manager.SqsFifoServiceProvider
to your config/app.php
.config/queue.php
to use the new SQS FIFO Queue driver.Example:
Create a new SqsFifoQueue
class that extends Illuminate\Queue\SqsQueue
but supports SQS FIFO.
<?php
class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
{
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'MessageGroupId' => uniqid(),
'MessageDeduplicationId' => uniqid(),
]);
return $response->get('MessageId');
}
}
Create a new SqsFifoConnector
class that extends Illuminate\Queue\Connectors\SqsConnector
that would establish a connection using SqsFifoQueue
.
<?php
use Aws\Sqs\SqsClient;
use Illuminate\Support\Arr;
class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
{
public function connect(array $config)
{
$config = $this->getDefaultConfiguration($config);
if ($config['key'] && $config['secret']) {
$config['credentials'] = Arr::only($config, ['key', 'secret']);
}
return new SqsFifoQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
);
}
}
Create a new SqsFifoServiceProvider
that registers the SqsFifoConnector
to Laravel's queue manager.
<?php
class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
{
public function register()
{
$this->app->afterResolving('queue', function ($manager) {
$manager->addConnector('sqsfifo', function () {
return new SqsFifoConnector;
});
});
}
}
Add SqsFifoServiceProvider
to your config/app.php
.
<?php
return [
'providers' => [
...
SqsFifoServiceProvider::class,
],
];
Update config/queue.php
to use the new SQS FIFO Queue driver.
<?php
return [
'default' => 'sqsfifo',
'connections' => [
'sqsfifo' => [
'driver' => 'sqsfifo',
'key' => 'my_key'
'secret' => 'my_secret',
'queue' => 'my_queue_url',
'region' => 'my_sqs_region',
],
],
];
Then your queue should now support SQS FIFO Queues.
Shameless plug: While working on the steps above I've created a laravel-sqs-fifo composer package to handle this at https://github.com/maqe/laravel-sqs-fifo.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With