Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Preload elements for Flux.generate(...)

I'm creating a Flux using Flux.generate(). The generator (Consumer) is actually reading from a message queue. The problem is that this call takes quite a long time (occasionally 1-2 seconds even). This will make the flux to stop processing.

package com.github.loa.vault.service.listener;

import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;

import java.util.function.Consumer;

@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {

    private final QueueManipulator queueManipulator;

    @Override
    public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
        final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
                queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);

        documentSourceItemSynchronousSink.next(
                DocumentArchivingContext.builder()
                        .type(DocumentType.valueOf(documentArchivingMessage.getType()))
                        .source(documentArchivingMessage.getSource())
                        .content(documentArchivingMessage.getContent())
                        .build()
        );
    }
}

Obviously adding parallel doesn't help because the generator is still being called one at a time.

Flux.generate(vaultQueueConsumer)
    .parallel()
    .runOn(Schedulers.parallel()) 
    .flatMap(vaultDocumentManager::archiveDocument)
    .subscribe();

Does anybody know how to make the generator parallel? I don't want to use Flux.create() because then I would lose backpressure.

like image 572
Lakatos Gyula Avatar asked Apr 20 '26 05:04

Lakatos Gyula


2 Answers

Mono.just(1).repeat()  // create infinite flux, maybe there is a nicer way for that?
    .flatMap(this::readFromQueue, 100) // define queue polling concurrency
    .flatMap(this::archiveDocument)
    .subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
    return Mono.fromCallable(() -> {
        Thread.sleep(1500); // your actual blocking queue polling here
        return "queue_element";
    }).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}
like image 82
Martin Tarjányi Avatar answered Apr 22 '26 18:04

Martin Tarjányi


The problem is that the vaultQueueConsumer includes slow operation. So, the solution is to extract this slow operation from generate to map that can be parallelised.

As an idea you can generate a queue name where the messages have to be consumed from and do the actual message consumption in a map method after making flux parallel:

String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
    .parallel()
    .runOn(Schedulers.parallel())
    .map(queueManipulator::readMessage)
    .doOnNext(log::info)
    .subscribe();

The fake QueueManipulator sleeps 1-2 seconds before returning a message:

public class QueueManipulator {

  private final AtomicLong counter = new AtomicLong();

  public String readMessage(String queue) {
    sleep(); //sleep 1-2 seconds
    return queue + " " + counter.incrementAndGet();
  }
  //...
}

This way message consumption is done in parallel:

12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8

This solution above is straightforward buy may look like a "hack".

Another idea is to call Flux.generate inside flatMap:

String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
    .flatMap(i ->
        Flux.<String>generate(synchronousSink -> {
          synchronousSink.next(queueManipulator.readMessage(queue));
        }).subscribeOn(Schedulers.parallel()))
    .doOnNext(log::info)
    .subscribe();
like image 24
Evgeniy Khyst Avatar answered Apr 22 '26 17:04

Evgeniy Khyst



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!