In vert.x I can send a message to another verticle and "wait asynchronously" for the reply.
The problem is: I want to send messages to multiple verticles and make an async handler to be called when all verticles replied.
Is this possible or is there a better design for achieving this functionality?
EDIT:
Supose I have a verticle A which sends messages to verticles B,C and D. Each verticle (B,C,D) do something with the message and return A some data. The verticle A then receives the response from B,C,D and does something with all the data. The problem is I have a handler for each message I send (one for A, one for B, one for C), I want one handler to be called when all the replies arrived.
setHandler, you can use future. onSuccess(value -> {}). onFailure(failure->{}); Also I usually have helper functions like the one either to complete or fail a future I have: FutureUtil.
One of the key advantages of Vert. x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads) - this allows it to handle a lot of concurrency (e.g. handle many connections, or messages) using a very small number of kernel threads, which allows it to scale very well.
The Vertx instance creates a number of threads internally to handle the exchange of messages between verticles. These threads are not daemon threads, so they prevent the JVM from shutting down, even if the main thread creating the Vertx instance terminates.
A verticle is the fundamental processing unit in Vert. x. The role of a verticle is to encapsulate a technical functional unit for processing events such as exposing an HTTP API and responding to requests, providing a repository interface on top of a database, or issuing requests to a third-party system.
As of Vert.x 3.2, the documentation explains how to coordinate asyncronously using Future
and CompositeFuture
.
So lets say you want to make two send
calls over the event bus and do something when both are succeeded:
Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());
Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());
CompositeFuture.all(f1, f2).setHandler(result -> {
// business as usual
});
Up to 6 futures can be passed as arguments or alternatively they can be passed as a list.
The best approach for this is to use Reactive Extensions, as implemented by Netflix's Rx.Java and offered by the RxVertx Module.
The huge variety of operators allows you to do things like "zipping" the results of several asynchronous calls into a new result and do whatever you want with it.
I have a simple demo available on GitHub, which contains:
final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);
This snippet shows how two observables coming from two event bus asynchronous interactions get "zipped" (ie merged) into one final HTTP response.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With