I'm implementing messenger in company which I work for. I found problem with routing key.
I want to to send one message to two queues. Two other apps will process this queues. Everything works well, but I found problem when handler throws an exception. It doubles message sending one it two retry queues, because retry queues are matching by binding key, which is the same for this queues.
Finally with 3 retries I have 16 messages on my dlqs. Could you help me with this problem? Is it possible to create retry strategy based maybe on queue, not routing key?
My config looks like:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': async
buses:
command.bus:
middleware:
- Olimp\Shared\Application\Message\Middleware\EventDispatcher
- doctrine_close_connection
- doctrine_transaction
event.bus:
default_middleware: allow_no_handlers
query.bus: ~
I dispatch event with stamp like that:
class MessengerTestCommand extends Command
{
protected static $defaultName = 'app:messenger-test';
private MessageBusInterface $bus;
public function __construct(MessageBusInterface $bus)
{
$this->bus = $bus;
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$this->bus->dispatch(
new TestEvent(), [
new AmqpStamp('first')
]
);
$io->success('Done');
return 0;
}
}
Handler:
class TestEventHandler implements MessageHandlerInterface
{
public function __invoke(TestEvent $event)
{
dump($event->id);
throw new \Exception('Boom');
}
}
What I found on rabbit:
Now I was trying config like that:
framework:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
async1:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': [async, async1]
and with two running console commands:
bin/console messenger:consume async
bin/console messenger:consume async1
But it works the same.
Ok, I found answer myself.
I created new retry strategy. I changed queue_name_pattern
to %routing_key%_%delay%
and created my own SendFailedMessageForRetryListener
. To retry envelope I added stamp new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName())
which is used to create proper routing key for delay queue. So instead of creating queue based on exchange name I have it created based on queue name.
Two more things:
Binding keys in queue looks like:
queues:
create_miniature_v1:
binding_keys:
- create_miniature_v1
- first
create_miniature_v2:
binding_keys:
- create_miniature_v2
- first
and failed queues:
queues:
create_miniature_v1_dlq:
binding_keys:
- create_miniature_v1
create_miniature_v2_dlq:
binding_keys:
- create_miniature_v2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With