I'm creating a Flux using Flux.generate(). The generator (Consumer) is actually reading from a message queue. The problem is that this call takes quite a long time (occasionally 1-2 seconds even). This will make the flux to stop processing.
package com.github.loa.vault.service.listener;
import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;
import java.util.function.Consumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {
private final QueueManipulator queueManipulator;
@Override
public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);
documentSourceItemSynchronousSink.next(
DocumentArchivingContext.builder()
.type(DocumentType.valueOf(documentArchivingMessage.getType()))
.source(documentArchivingMessage.getSource())
.content(documentArchivingMessage.getContent())
.build()
);
}
}
Obviously adding parallel doesn't help because the generator is still being called one at a time.
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
Does anybody know how to make the generator parallel? I don't want to use Flux.create() because then I would lose backpressure.
Mono.just(1).repeat() // create infinite flux, maybe there is a nicer way for that?
.flatMap(this::readFromQueue, 100) // define queue polling concurrency
.flatMap(this::archiveDocument)
.subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
return Mono.fromCallable(() -> {
Thread.sleep(1500); // your actual blocking queue polling here
return "queue_element";
}).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}
The problem is that the vaultQueueConsumer includes slow operation.
So, the solution is to extract this slow operation from generate to map that can be parallelised.
As an idea you can generate a queue name where the messages have to be consumed from and do the actual message consumption in a map method after making flux parallel:
String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
.parallel()
.runOn(Schedulers.parallel())
.map(queueManipulator::readMessage)
.doOnNext(log::info)
.subscribe();
The fake QueueManipulator sleeps 1-2 seconds before returning a message:
public class QueueManipulator {
private final AtomicLong counter = new AtomicLong();
public String readMessage(String queue) {
sleep(); //sleep 1-2 seconds
return queue + " " + counter.incrementAndGet();
}
//...
}
This way message consumption is done in parallel:
12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8
This solution above is straightforward buy may look like a "hack".
Another idea is to call Flux.generate inside flatMap:
String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
.flatMap(i ->
Flux.<String>generate(synchronousSink -> {
synchronousSink.next(queueManipulator.readMessage(queue));
}).subscribeOn(Schedulers.parallel()))
.doOnNext(log::info)
.subscribe();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With