Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Slow message consumption using AmazonSQSClient

So, i used concurrency in spring jms 50-100, allowing max connections upto 200. Everything is working as expected but if i try to retrieve 100k messages from queue, i mean there are 100k messages on my sqs and i reading them through the spring jms normal approach.

@JmsListener
Public void process (String message) {
count++;
Println (count);
//code
 }

I am seeing all the logs in my console but after around 17k it starts throwing exceptions

Something like : aws sdk exception : port already in use.

Why do i see this exception and how do. I get rid of it?

I tried looking on the internet for it. Couldn't find anything.

My setting :

Concurrency 50-100

Set messages per task :50

Client acknowledged

timestamp=10:27:57.183, level=WARN , logger=c.a.s.j.SQSMessageConsumerPrefetch, message={ConsumerPrefetchThread-30} Encountered exception during receive in ConsumerPrefetch thread,
javax.jms.JMSException: AmazonClientException: receiveMessage.
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:422)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:339)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:248)
    at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:207)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Address already in use: connect

Update : i looked for the problem and it seems that new sockets are being created until every sockets gets exhausted.

My spring jms version would be 4.3.10

To replicate this problem just do the above configuration with the max connection as 200 and currency set to 50-100 and push some 40k messages to the sqs queue.. One can use https://github.com/adamw/elasticmq this as a local stack server which replicates Amazon sqs.. After being done till here. Comment jms listener and use soap ui load testing and call the send message to fire many messages. Just because you commented @jmslistener annotation, it won't consume messages from queue. Once you see that you have sent 40k messages, stop. Uncomment @jmslistener and restart the server.

Update :

DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setErrorHandler(Throwable::printStackTrace);
        factory.setConcurrency("50-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        return factory;

Update :

SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);

Update :

Client configuration details :

Protocol : HTTP
Max connections : 200

Update :

I used cache connection factory class and it seems. I read on stack overflow and in their official documentation to not use cache connection factory class and default jms listener container factory.

https://stackoverflow.com/a/21989895/5871514

It's gives the same error that i got before though.

update

My goal is to get a 500 tps, i.e i should be able to consume that much.. So i tried this method and it seems I can reach 100-200, but not more than that.. Plus this thing is a blocker at high concurrency .. If you use it.. If you have some better solution to achieve it.. I am all ears.

**updated **

I am using amazonsqsclient

like image 684
Tilak Raj Avatar asked Dec 11 '18 15:12

Tilak Raj


People also ask

How long do messages stay in flight SQS?

The default visibility timeout for a message is 30 seconds. The minimum is 0 seconds. The maximum is 12 hours.

Under what circumstances would you use an SQS delay queue?

Delay queues let you postpone the delivery of new messages to consumers for a number of seconds, for example, when your consumer application needs additional time to process messages. If you create a delay queue, any messages that you send to the queue remain invisible to consumers for the duration of the delay period.

How many messages can a SQS queue hold?

Each SQS queue is limited to 120,000 inflight messages, or 20,000 if it is a FIFO queue. When sending a message to a queue with too many inflight messages, SQS returns the "OverLimit" error message.


1 Answers

Starvation on the Consumer

One possible optimization that JMS clients tend to implement, is a message consumption buffer or "prefetch". This buffer is sometimes tunable via the number of messages or by a buffer size in bytes.

The intention is to prevent the consumer from going to the server every single time it receives a messages, rather than pulling multiple messages in a batch.

In an environment where you have many "fast consumers" (which is the opinionated view these libraries may take), this prefetch is set to a somewhat high default in order to minimize these round trips.

However, in an environment with slow message consumers, this prefetch can be a problem. The slow consumer is holding up messaging consumption for those prefetched messages from the faster consumer. In a highly concurrent environment, this can cause starvation quickly.

That being the case the SQSConnectionFactory has a property for this:

SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), amazonSQSclient);
sqsConnectionFactory.setNumberOfMessagesToPrefetch(0);

Starvation on the Producer (i.e. via JmsTemplate)

It's very common for these JMS implementations to expect be interfaced to the broker via some intermediary. These intermediaries actually cache and reuse connections or use a pooling mechanism to reuse them. In the Java EE world, this is usually taken care of a JCA adapter or other method on a Java EE server.

Because of the way Spring JMS works, it expects an intermediary delegate for the ConnectionFactory to exist to do this caching/pooling. Otherwise, when Spring JMS wants to connect to the broker, it will attempt to open a new connection and session (!) every time you want to do something with the broker.

To solve this, Spring provides a few options. The simplest being the CachingConnectionFactory, which caches a single Connection, and allows many Sessions to be opened on that Connection. A simple way to add this to your @Configuration above would be something like:

@Bean
public ConnectionFactory connectionFactory(AmazonSQSClient amazonSQSclient) {

    SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), amazonSQSclient);

    // Doing the following is key!
    CachingConnectionFactory connectionfactory = new CachingConnectionFactory();
    connectionfactory.setTargetConnectionFactory(sqsConnectionFactory);
    // Set the #connectionfactory properties to your liking here...

    return connectionFactory;

}

If you want something more fancy as a JMS pooling solution (which will pool Connections and MessageProducers for you in addition to multiple Sessions), you can use the reasonably new PooledJMS project's JmsPoolConnectionFactory, or the like, from their library.

like image 188
Dovmo Avatar answered Oct 19 '22 21:10

Dovmo