Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Recursion on reactive streams with Project Reactor

My objective is to traverse a graph of directories and log all their names, using reactive streams and Project Reactor.

As the filesystem is remote, the calls to it are blocking. So I would like to keep the execution of the blocking call separate from the rest of my non-blocking async code. I'm doing this using this recommendation: http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking.

Here's the structure I need to traverse:

/
  /jupiter
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

  /earth
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

  /mars
    /phase-1
      /sub-phase-1
      /sub-phase-2
      /sub-phase-3
    /phase-2
    /phase-3
    /phase-4

And here's the code I came up with so far:

public class ReactorEngine {

    private static Logger log = LoggerFactory.getLogger(ReactorEngine.class);

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);

        Server server = new Server();

        Flux.fromIterable(server.getChildren("/"))
            .flatMap(parent -> Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()))
            .publishOn(Schedulers.elastic())
            .doOnTerminate(latch::countDown)
            .subscribe(ReactorEngine::handleResponse);

        latch.await();
    }

    private static void handleResponse(List<String> value) {
        log.info("Received: " + value);
    }

}

public class Server {

    public List<String> getChildren(final String path) {
        // Generate some I/O
        ...
    }
}

So I start from the top level directories, and asynchronously request the first level down (their children). All goes well, and this is the output:

15:35:05.902 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:35:07.062 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:35:07.140 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:35:07.140 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:35:07.140 [elastic-5] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:35:08.140 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/earth/phase-1/, /earth/phase-2/, /earth/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/jupiter/phase-1/, /jupiter/phase-2/, /jupiter/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/mars/phase-1/, /mars/phase-2/, /mars/phase-3/]

Now my question is how do I put back into the flux the elements that came as results, so that the engine will recursively call the server.getChildren(parent) until the entire directory graph is traversed?

Is actually recursion the way to go, or is there a better "reactive" way of doing this, perhaps through operators?

Thanks!

Edit

The expand(Function) operator suggested by Simon works nice to traverse the graph. I've changed the code to this:

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Server server = new Server();

    Flux.fromIterable(server.getChildren("/"))
        .expand(p -> Flux.fromIterable(server.getChildren(p)).subscribeOn(Schedulers.elastic()))
        .publishOn(Schedulers.elastic())
        .doOnTerminate(latch::countDown)
        .subscribe(ReactorEngine::handleResponse);

    latch.await();
}

However, I lost the async way of calling the blocking server.getChildren(String) method of my server. As you can see in these logs, each subdirectory is obtained synchronously, once a second:

15:57:55.398 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:57:56.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:57:56.593 [main] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:57:56.593 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/
15:57:57.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:57:57.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/
15:57:58.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:57:58.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/
15:57:59.599 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-1/...
15:57:59.599 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-1/
15:58:00.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-2/...
15:58:00.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-2/
15:58:01.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-3/...
15:58:01.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-3/
15:58:02.601 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-4/...
15:58:02.601 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-4/
15:58:03.602 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-1/...
15:58:03.603 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-1/
15:58:04.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-2/...
15:58:04.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-2/
15:58:05.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-3/...
15:58:05.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-3/
15:58:06.605 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-4/...
15:58:06.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-4/
15:58:07.605 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-1/...
15:58:07.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-1/
15:58:08.606 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-2/...
15:58:08.606 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-2/
15:58:09.607 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-3/...
15:58:09.607 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-3/
15:58:10.608 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-4/...
15:58:10.608 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-4/

Could you please provide a hint on how to bring the call to Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()) back into the scheme? There's no Flux.fromCallable() that I can call, and perhaps for a good reason.

But as I am really new to reactive programming and the concepts of Project Reactor, it's kind of hard to wrap my head around this way of doing async.

Thank you.

like image 578
Bogdan Minciu Avatar asked Dec 16 '17 15:12

Bogdan Minciu


1 Answers

There's an operator for that :) Have a look at expand and expandDeep

like image 120
Simon Baslé Avatar answered Sep 21 '22 02:09

Simon Baslé