Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to move messages off DLQ in Amazon SQS?

People also ask

How do I Redrive DLQ messages?

Select the DLQ and choose Start DLQ redrive. SQS allows you to redrive messages either to their source queue(s) or to a custom destination queue. Choose to Redrive to source queue(s), which is the default.

How do I empty a dead-letter queue in AWS?

To purge a queue, log in to the AWS Management Console and choose Amazon SQS. Then, select a queue, and choose “Purge Queue” from the Queue Actions menu. The queue will then be cleared of all messages. You can also purge queues using the AWS SDKs or command-line tools.

What is the AWS recommended way of managing large messages in SQS?

To manage large Amazon Simple Queue Service (Amazon SQS) messages, you can use Amazon Simple Storage Service (Amazon S3) and the Amazon SQS Extended Client Library for Java. This is especially useful for storing and consuming messages up to 2 GB.

How do I get messages from DLQ?

To receive messages from DLQ through SB explorer, you need to click on that particular queue and then click on “Deadletter” tab then one dialogue box will pop up then you need to click on “Receive and Delete”. The default value is Top10 so top10 messages will be received from DLQ.


Here is a quick hack. This is definitely not the best or recommended option.

  1. Set the main SQS queue as the DLQ for the actual DLQ with Maximum Receives as 1.
  2. View the content in DLQ (This will move the messages to the main queue as this is the DLQ for the actual DLQ)
  3. Remove the setting so that the main queue is no more the DLQ of the actual DLQ

There are a few scripts out there that do this for you:

  • npm / nodejs based: http://github.com/garryyao/replay-aws-dlq
# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
  • go based: https://github.com/mercury2269/sqsmover
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 

Don't need to move the message because it will come with so many other challenges like duplicate messages, recovery scenarios, lost message, de-duplication check and etc.

Here is the solution which we implemented -

Usually, we use the DLQ for transient errors, not for permanent errors. So took below approach -

  1. Read the message from DLQ like a regular queue

    Benefits
    • To avoid duplicate message processing
    • Better control on DLQ- Like I put a check, to process only when the regular queue is completely processed.
    • Scale up the process based on the message on DLQ
  2. Then follow the same code which regular queue is following.

  3. More reliable in case of aborting the job or the process got terminated while processing (e.g. Instance killed or process terminated)

    Benefits
    • Code reusability
    • Error handling
    • Recovery and message replay
  4. Extend the message visibility so that no other thread process them.

    Benefit
    • Avoid processing same record by multiple threads.
  5. Delete the message only when either there is a permanent error or successful.

    Benefit
    • Keep processing until we are getting a transient error.

That looks like your best option. There is a possibility that your process fails after step 2. In that case you'll end up copying the message twice, but you application should be handling re-delivery of messages (or not care) anyway.


I wrote a small python script to do this, by using boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

you can get this script in this link

this script basically can move messages between any arbitrary queues. and it supports fifo queues as well as you can supply the message_group_id field.