Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

vert.x Wait for reply on multiple messages

In vert.x I can send a message to another verticle and "wait asynchronously" for the reply.

The problem is: I want to send messages to multiple verticles and make an async handler to be called when all verticles replied.

Is this possible or is there a better design for achieving this functionality?

EDIT:

Supose I have a verticle A which sends messages to verticles B,C and D. Each verticle (B,C,D) do something with the message and return A some data. The verticle A then receives the response from B,C,D and does something with all the data. The problem is I have a handler for each message I send (one for A, one for B, one for C), I want one handler to be called when all the replies arrived.

like image 258
TiagoOliveira Avatar asked Aug 07 '14 11:08

TiagoOliveira


People also ask

How do you wait for Vertx future?

setHandler, you can use future. onSuccess(value -> {}). onFailure(failure->{}); Also I usually have helper functions like the one either to complete or fail a future I have: FutureUtil.

What is non blocking in Vertx?

One of the key advantages of Vert. x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads) - this allows it to handle a lot of concurrency (e.g. handle many connections, or messages) using a very small number of kernel threads, which allows it to scale very well.

How does Vertx work?

The Vertx instance creates a number of threads internally to handle the exchange of messages between verticles. These threads are not daemon threads, so they prevent the JVM from shutting down, even if the main thread creating the Vertx instance terminates.

What is Vertx verticle?

A verticle is the fundamental processing unit in Vert. x. The role of a verticle is to encapsulate a technical functional unit for processing events such as exposing an HTTP API and responding to requests, providing a repository interface on top of a database, or issuing requests to a third-party system.


2 Answers

As of Vert.x 3.2, the documentation explains how to coordinate asyncronously using Futureand CompositeFuture.

So lets say you want to make two send calls over the event bus and do something when both are succeeded:

Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());

Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());

CompositeFuture.all(f1, f2).setHandler(result -> {
  // business as usual
});

Up to 6 futures can be passed as arguments or alternatively they can be passed as a list.

like image 190
Yan Foto Avatar answered Sep 19 '22 08:09

Yan Foto


The best approach for this is to use Reactive Extensions, as implemented by Netflix's Rx.Java and offered by the RxVertx Module.

The huge variety of operators allows you to do things like "zipping" the results of several asynchronous calls into a new result and do whatever you want with it.

I have a simple demo available on GitHub, which contains:

final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);

This snippet shows how two observables coming from two event bus asynchronous interactions get "zipped" (ie merged) into one final HTTP response.

like image 34
David Dossot Avatar answered Sep 20 '22 08:09

David Dossot