Trying to create SQS Poller which:
As an example I'm using this JavaRx implementation which is easily transformed to Project Reactor and enrich it with backpressure.
private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;
public Flux<Message> pollMessages(GetQueueUrlResult q)
{
return Flux.create(sink -> {
long backoff = DEFAULT_BACKOFF;
while (!stopRequested)
{
if (sink.isCancelled())
{
sink.error(new RuntimeException("Stop requested"));
break;
}
Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
? amazonSQS.receiveMessageAsync(createRequest(q))
: completedFuture(new ReceiveMessageResult());
try
{
ReceiveMessageResult result = future.get();
if (result != null && !result.getMessages().isEmpty())
{
backoff = DEFAULT_BACKOFF;
LOGGER.info("New messages found in queue size={}", result.getMessages().size());
result.getMessages().forEach(m -> {
if (sink.requestedFromDownstream() > 0L)
{
sink.next(m);
}
});
}
else
{
if (backoff < MAX_BACKOFF)
{
backoff = backoff * 2;
}
LOGGER.debug("No messages found on queue. Sleeping for {} ms.", backoff);
// This is to prevent rate limiting by the AWS api
Thread.sleep(backoff);
}
}
catch (InterruptedException e)
{
stopRequested = true;
}
catch (ExecutionException e)
{
sink.error(e);
}
}
});
}
Implementation seems working but there are few questions:
Flux.generate
but was not able to control number of async call made to SqsClientFlux.interval
approach don't understand how to proper implement backoff policyThread.sleep
call any ideas how to replace it?sink.error
is used to cover that case now.What do you think about the following solution:
private static final Integer batchSize = 1;
private static final Integer intervalRequest = 3000;
private static final Integer waitTimeout = 10;
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static final SqsAsyncClient sqsAsync =
SqsAsyncClient
.builder()
.endpointOverride(URI.create(queueUrl))
.build();
public static Flux<Message> sqsPublisher =
Flux.create(sink -> {
if (sink.isCancelled()) {
sink.error(new RuntimeException("Stop requested"));
}
scheduler.scheduleWithFixedDelay(() -> {
long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
if (numberOfRequests > 0) {
ReceiveMessageRequest request = ReceiveMessageRequest
.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages((int) numberOfRequests)
.waitTimeSeconds(waitTimeout).build();
CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);
response.thenApply(responseValue -> {
if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
}
return responseValue;
});
}
}, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With