Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Process Amazon SQS messages from queue asynchronously in Java

I am having hard time figuring out how to process messages from Amazon SQS.

I am trying to implement following:

  1. Listener on SQS
  2. Process message from queue and add it to DB
  3. Delete processed message from queue

What bothers me a lot is how to implement step 2. I have class SQSConnector and ProfileDao. Right now I want simple implementation, by initializing SQSConnector in ProfileDao and receiving messages from queue. My idea is to start new thread, start polling messages and when queue is empty interrupt the thread from ProfileDao.

What's the best way of returning/processing messages (callback function?), and if there is another way of doing this I am open for options.

Thank you

like image 517
jasenkoh Avatar asked Oct 21 '22 15:10

jasenkoh


1 Answers

I've accomplished something similar with SQS using Java's ExecutorService, Future, and the ConcurrentLinkedQueue.

The ExecutorService creates a thread pool that can execute classes that implement the Callable interface and returns a Future. As the ExecutorService creates the futures I push them onto a ConcurrentLinkedQueue that runs in a thread and processes the results as the futures complete.

Implement checking SQS and starting the work asynchronously:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SqsProcessor {

    private static final int THREAD_COUNT = 100;
    private ExecutorService _executor = null;
    private FutureResultProcessor futureResultProcessor = null;

    public SqsProcessor() {
        _executor = Executors.newFixedThreadPool(THREAD_COUNT);
        _futureResultProcessor = new FutureResultProcessor();
    }

    public void waitReceive() {

        // Receive a SQS message

        // Start the work related to the SQS message
        Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage);
        Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);

        // Send to the queue so the result can be processed when it completes
        _futureResultProcessor.add(sqsFuture);
    }
}

Class that does the work:

import java.util.concurrent.Callable;

public class MyWorker implements Callable<MyWorkerResult> {

    private String _sqsMessage = null;

    public MyWorker(String sqsMessage) {
        _sqsMessage = sqsMessage;
    }

    @Override
    public MyWorkerResult call() throws Exception {
        // Do work relating to the SQS message
    }
}

Holds the results of the work:

public class MyWorkerResult {
    // Results set in MyWorker call()
}

ConcurrentLinkedQueue to receive and process the future results:

import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;

public class FutureResultProcessor extends Thread {

    private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
    private final Integer CHECK_SLEEP = 300;

    public FutureResultProcessor() {
    }

    public void run() {
        while(true) {
            Future<MyWorkerResult> myFuture = resultQueue.poll();

            if(myFuture == null) {
                // There's nothing to process
                try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
                continue;
            }

            // Process result
            if(myFuture != null) {

                MyFutureResult myFutureResult = myFuture.get();

                // Process result
            }
        }
    }

    public void add(Future<MyWorkerResult> sqsFuture) {
        resultQueue.offer(sqsFuture);
    }
}

Alternatively you could collect a group of futures and wait for them all to finish before processing the results.

Akka could be a good fit. I haven't used it directly, but it provides a framework for running asynchronous tasks, provides error handling, and could even distribute the tasks to remote instances.

like image 67
HypeXR Avatar answered Oct 23 '22 12:10

HypeXR