We use Futures in vertx in examples like:
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
asyncResult -> {
if (asyncResult.succeeded()) {
LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
}
else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
where we handle 2 calls for example.
OR I have another snippet where I can handle any number of methods:
List<Future> futures = new ArrayList<>();
conversation.getRequestList().forEach(req -> {
Future<Message<Object>> senderFuture = Future.future();
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());
// sent successfully. save the replyAddress and the conversation for later/callback
log.info("Saving the conversation for the request.", conversation.getReplyAddress());
pendingCommands.put(req.getBody().getString(MSG_ID), conversation);
futures.add(senderFuture);
});
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture());
} else {
log.error("forwardToVWClient VW got result : {}", ar.cause());
handler.handle(Future.failedFuture(ar.cause()));
}
});
Here we are chaining all the requests in the conversation.getRequestList()
without knowing their count in advance.
But the shortcoming of .all()
method is that, we have no control on the order.
How can I chain any number of methods with Vertx Futures (without knowing the exact count of the calls) ?
EDIT:
The official guide talks about sequential composition but the example given has 3 calls. It does not explain how to do it for arbitrary number of calls.
See "Sequential composition" in http://vertx.io/docs/vertx-core/java/
I hope it is clear.
Here is a solution using map & reduce
that executes a method in an orderly fashion and returns the accumulated result in the form of a Future<String>
public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
return list.stream().reduce(Future.succeededFuture(),// the initial "future"
(acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
(a,b) -> Future.future()); // not used! only useful for parallel stream.
}
can be used as in the example below:
chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);
where sendApiRequestViaBus
is:
/**
* @param request The request to process
* @return The result of the request processing.
*/
Future<String> sendApiRequestViaBus(ApiRequest request) {
Future<String> future = Future.future();
String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
log.debug("Chain call start msgId {}", request.getId());
vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
log.debug("Chain call returns {}", request.getId());
if (res.succeeded()) {
future.complete("OK");
} else {
future.fail("KO");
}
});
return future;
}
I hope it helps.
If you want to feed the response from the previous request to the next request, and suppose you have different handlers for each response. You can add a helper method
private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
Future<T> result = init;
for (Function<T, Future<T>> handler : handlers) {
result = result.compose(handler);
}
return result;
}
And then change your code like this
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
vehicleDoor(routingContext, client, vehicleJson, lock);
Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
// add here new request by using information from someJsonObj
LOG.info("Hello from trivial handler {} ", someJsonObj);
return Future.succeededFuture(someJsonObj);
};
List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();
handlers.add(vehicleResponseHandler);
handlers.add(anotherTrivialHandler);
chain(fetchVehicle, handlers).setHandler( asyncResult -> {
if (asyncResult.succeeded()) {
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
} else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
But there is a limitation for this implementation which requires each chained Future
must have the same type parameter T
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With