I am having hard time figuring out how to process messages from Amazon SQS.
I am trying to implement following:
What bothers me a lot is how to implement step 2. I have class SQSConnector
and ProfileDao
. Right now I want simple implementation, by initializing SQSConnector
in ProfileDao
and receiving messages from queue. My idea is to start new thread, start polling messages and when queue is empty interrupt the thread from ProfileDao
.
What's the best way of returning/processing messages (callback function?), and if there is another way of doing this I am open for options.
Thank you
I've accomplished something similar with SQS using Java's ExecutorService, Future, and the ConcurrentLinkedQueue.
The ExecutorService creates a thread pool that can execute classes that implement the Callable interface and returns a Future. As the ExecutorService creates the futures I push them onto a ConcurrentLinkedQueue that runs in a thread and processes the results as the futures complete.
Implement checking SQS and starting the work asynchronously:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SqsProcessor {
private static final int THREAD_COUNT = 100;
private ExecutorService _executor = null;
private FutureResultProcessor futureResultProcessor = null;
public SqsProcessor() {
_executor = Executors.newFixedThreadPool(THREAD_COUNT);
_futureResultProcessor = new FutureResultProcessor();
}
public void waitReceive() {
// Receive a SQS message
// Start the work related to the SQS message
Callable<MyWorkderResult> sqsWorker = new MyWorker(sqsMessage);
Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);
// Send to the queue so the result can be processed when it completes
_futureResultProcessor.add(sqsFuture);
}
}
Class that does the work:
import java.util.concurrent.Callable;
public class MyWorker implements Callable<MyWorkerResult> {
private String _sqsMessage = null;
public MyWorker(String sqsMessage) {
_sqsMessage = sqsMessage;
}
@Override
public MyWorkerResult call() throws Exception {
// Do work relating to the SQS message
}
}
Holds the results of the work:
public class MyWorkerResult {
// Results set in MyWorker call()
}
ConcurrentLinkedQueue to receive and process the future results:
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FutureResultProcessor extends Thread {
private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
private final Integer CHECK_SLEEP = 300;
public FutureResultProcessor() {
}
public void run() {
while(true) {
Future<MyWorkerResult> myFuture = resultQueue.poll();
if(myFuture == null) {
// There's nothing to process
try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
continue;
}
// Process result
if(myFuture != null) {
MyFutureResult myFutureResult = myFuture.get();
// Process result
}
}
}
public void add(Future<MyWorkerResult> sqsFuture) {
resultQueue.offer(sqsFuture);
}
}
Alternatively you could collect a group of futures and wait for them all to finish before processing the results.
Akka could be a good fit. I haven't used it directly, but it provides a framework for running asynchronous tasks, provides error handling, and could even distribute the tasks to remote instances.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With