Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Vertx: Timeout in message reply

Tags:

vert.x

I have a sender and a consumer that exchange messages:

public class Sender extends AbstractVerticle {
@Override
public void start() {
    EventBus eventBus = vertx.eventBus();
    eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    });
    eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}

public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;

@Override
public void start() {
    eventBus = vertx.eventBus();

    eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    }));
}

}

I expect that when the consumer replies to the message, it will be received by the sender. However, I get a timeout:

Successfully sent reply

Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1

Deployment:

public class ServiceLauncher { 

private static Vertx vertx = Vertx.vertx();

public static void main(String[] args) {
    vertx.deployVerticle(new Consumer(), res -> {
        if (res.succeeded()) {
            System.out.println("Verticle " + Consumer.NAME + " deployed.");
            vertx.deployVerticle(new Sender());
            System.out.println("Verticle " + Sender.NAME + " deployed.");
        } else {
            System.out.println("Verticle " + Consumer.NAME + " not deployed.");
        }
    });
}

What am I doing wrong? Thanx in advance

Update: The problem is in msg.reply() - the consumer doesn't reply to the message but I can't still figure out why.

like image 829
locus Avatar asked Sep 20 '25 11:09

locus


1 Answers

The timeout occurs not in the sender of the request, but in its recipient. Handler, defined in msg.reply(), waits for next reply from the sender. It is not a handler, confirming just send status. And handler in Sender's eventBus.send() also fires when sender receives a reply.

Just remove handler in msg.reply() and modify handler eventBus.send() in Sender in the same manner:

public class Sender extends AbstractVerticle {
    public static final String NAME = "SENDER";

    @Override
    public void start() {
        EventBus eventBus = vertx.eventBus();
        eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
            if (res.succeeded()) {
                System.out.println("Successfully received reply: " + res.result().body());
            } else {
                System.out.println("Failed to send reply." + res.cause());
            }
        });

    }
}

and

public class Consumer extends AbstractVerticle {
    public static final String NAME = "CONSUMER";

    @Override
    public void start() {
        final EventBus eventBus = vertx.eventBus();
        eventBus.consumer(Constants.ADDRESS, msg -> {
            System.out.println("Message received");
            msg.reply("Hi from consumer.");
        });

    }
}

And after execute you'll see:

Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.
like image 67
xuthus Avatar answered Sep 21 '25 23:09

xuthus