I have a sender and a consumer that exchange messages:
public class Sender extends AbstractVerticle {
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}
public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;
@Override
public void start() {
eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
}));
}
}
I expect that when the consumer replies to the message, it will be received by the sender. However, I get a timeout:
Successfully sent reply
Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1
Deployment:
public class ServiceLauncher {
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) {
vertx.deployVerticle(new Consumer(), res -> {
if (res.succeeded()) {
System.out.println("Verticle " + Consumer.NAME + " deployed.");
vertx.deployVerticle(new Sender());
System.out.println("Verticle " + Sender.NAME + " deployed.");
} else {
System.out.println("Verticle " + Consumer.NAME + " not deployed.");
}
});
}
What am I doing wrong? Thanx in advance
Update: The problem is in msg.reply() - the consumer doesn't reply to the message but I can't still figure out why.
The timeout occurs not in the sender of the request, but in its recipient.
Handler, defined in msg.reply()
, waits for next reply from the sender. It is not a handler, confirming just send status.
And handler in Sender
's eventBus.send()
also fires when sender receives a reply.
Just remove handler in msg.reply()
and modify handler eventBus.send()
in Sender
in the same manner:
public class Sender extends AbstractVerticle {
public static final String NAME = "SENDER";
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully received reply: " + res.result().body());
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
}
}
and
public class Consumer extends AbstractVerticle {
public static final String NAME = "CONSUMER";
@Override
public void start() {
final EventBus eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> {
System.out.println("Message received");
msg.reply("Hi from consumer.");
});
}
}
And after execute you'll see:
Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With