Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

using amazon sqs in a @MessageDriven bean - pooling / parallel processing

We need to use queues in our Java EE application and since it is a cloud base application (deployed on OpenShift Online), we like to use amazon sqs.

If I understand the theorie of the receiving part of JMS / Java EE correctly, a @MessageDriven bean is managed by the Java EE container so that a lot of bean instances are created in parallel (according max pool size), if the number of the incoming messages is high. This is of course a big benefit to process high loads.

However, I do not see how we can integrate aws sqs this way in a Java EE application. I know the asynchronous receiver examples from http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

and then:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

This is the official asynchronous receiver example - which is not a @MessageDriven bean. It is obvious, that we need somewhere the credentials to authenticate (by creating an SQSConnectionFactory, then a connection, then a session - which is also well described in the example).
But I strongly suppose that this example will not process the messages in parallel - i.e. only one bean instance is processing the queue and this is not a good solution for scalable, high loaded applications.

a) How can we go the real Java EE way with Amazon SQS? I just find a planty of Spring examples. But it must be Java EE 7.

b) We use Wildfly (currently 8.2.1). Would it be also possible to let Wildfly manage the connection to AWS and application internally, we could use the queue as if it were an application server managed queue (same approach like data sources for DB access)?

Conclusion after got an answer from stdunbar:
It seems not to be possible in a 'proper way', what I like to do. So what should I do? Implement a ManagedExecutorService as stdunbar described to 'wrap' the queue? - However this implies to have a local queue as well and this is not a good situation for an application, which should be scaleable?! What is about alternatives? We are running the application on OpenShift Online. It would probably be bether to instantiate an own gear with e.g. ApacheMQ Cartridge... there are of course a lot of disadventages like costs and that we are responsible for the 'infrastructure'.

To be honest, I am really disappointed of AWS in this case...

like image 730
badera Avatar asked Dec 15 '16 19:12

badera


1 Answers

I don't think that my solution is proper JAVA EE, but in my case it works.

Configuration:

@Singleton
public class SqsMessageManager
{
    private Integer numberOfReceivers = 3;

    public static SQSConnection connection = null;
    public static Queue queue = null;

    @Inject
    SqsMessageReceiver sqsMessageReceiver;

    public void init()
    {
        try
        {
            SQSConnectionFactory connectionFactory =
                    SQSConnectionFactory.builder()
                            .withRegion(Region.getRegion(Regions.EU_WEST_1))
                            .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
                            .build();

            connection = connectionFactory.createConnection();

            queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");

            for (int i = 0; i < numberOfReceivers; i++)
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);

            connection.start();
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}

Then the sender:

@Dependent
public class SqsMessageSender
{
    MessageProducer producer = null;
    Session senderSession = null;

    @PostConstruct
    public void createProducer(){
        try
        {
            // open new session and message producer
            senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = senderSession.createProducer(SqsMessageManager.queue);
        }catch(JMSException | NullPointerException e){
            ;
        }
    }

    @PreDestroy
    public void destroy(){
        try
        {
            // close session
            producer.close();
            senderSession.close();
        }catch(JMSException e){

        }
    }

    // sends a message to aws sqs queue
    public void sendMessage(String txt)
    {
        try
        {
            TextMessage textMessage = senderSession.createTextMessage(txt);
            producer.send(textMessage);
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}

And the receiver:

@Dependent
public class SqsMessageReceiver implements MessageListener
{
    public void onMessage(Message inMessage) {
        ...
    }
}
like image 121
Simon Schüpbach Avatar answered Sep 28 '22 16:09

Simon Schüpbach