Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive SQS Poller with backpressue

Trying to create SQS Poller which:

  • Do exponential polls (to reduce number of request if there is no messages present in the queue)
  • Query SQS more often if there are a lot of messages in queue
  • Have backpressure if certain number of messages are received, it stops polling
  • Not to be throttled by AWS API rate limit

As an example I'm using this JavaRx implementation which is easily transformed to Project Reactor and enrich it with backpressure.

private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;

public Flux<Message> pollMessages(GetQueueUrlResult q)
{
    return Flux.create(sink -> {
        long backoff = DEFAULT_BACKOFF;

        while (!stopRequested)
        {
            if (sink.isCancelled())
            {
                sink.error(new RuntimeException("Stop requested"));
                break;
            }

            Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
                    ? amazonSQS.receiveMessageAsync(createRequest(q))
                    : completedFuture(new ReceiveMessageResult());

            try
            {
                ReceiveMessageResult result = future.get();

                if (result != null && !result.getMessages().isEmpty())
                {
                    backoff = DEFAULT_BACKOFF;

                    LOGGER.info("New messages found in queue size={}", result.getMessages().size());

                    result.getMessages().forEach(m -> {
                        if (sink.requestedFromDownstream() > 0L)
                        {
                            sink.next(m);
                        }
                    });
                }
                else
                {
                    if (backoff < MAX_BACKOFF)
                    {
                        backoff = backoff * 2;
                    }

                    LOGGER.debug("No messages found on queue.  Sleeping for {} ms.", backoff);

                    // This is to prevent rate limiting by the AWS api
                    Thread.sleep(backoff);
                }
            }
            catch (InterruptedException e)
            {
                stopRequested = true;
            }
            catch (ExecutionException e)
            {
                sink.error(e);
            }
        }
    });
}

Implementation seems working but there are few questions:

  • Looks like querying Future results in the loop can be done using Reactor Primitives, tried it with Flux.generate but was not able to control number of async call made to SqsClient
  • In case of Flux.interval approach don't understand how to proper implement backoff policy
  • Don't like Thread.sleep call any ideas how to replace it?
  • How to properly stop loop in case cancel signal? Using sink.error is used to cover that case now.
like image 259
user1570815 Avatar asked Jun 16 '18 00:06

user1570815


1 Answers

What do you think about the following solution:

    private static final Integer batchSize = 1;
    private static final Integer intervalRequest = 3000;
    private static final Integer waitTimeout = 10;
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private static final SqsAsyncClient sqsAsync =
       SqsAsyncClient
         .builder()
         .endpointOverride(URI.create(queueUrl))
         .build();

    public static Flux<Message> sqsPublisher =
        Flux.create(sink -> {
                if (sink.isCancelled()) {
                    sink.error(new RuntimeException("Stop requested"));
                }

            scheduler.scheduleWithFixedDelay(() -> {
                long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
                if (numberOfRequests > 0) {
                    ReceiveMessageRequest request = ReceiveMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .maxNumberOfMessages((int) numberOfRequests)
                            .waitTimeSeconds(waitTimeout).build();

                    CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);

                    response.thenApply(responseValue -> {
                        if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
                            responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
                        }
                        return responseValue;
                    });

                }
            }, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
        });
like image 171
user3725190 Avatar answered Oct 30 '22 22:10

user3725190