Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How comes my channel.basicConsume does not wait for messages

Whenever I start the following code:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String exchangeName = "direct_logs";
    channel.exchangeDeclare(exchangeName, "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, "red");
    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException{
            String message = new String(body, "UTF-8");
            System.out.println(message);
            System.out.println("message received");
        }
    };

    channel.basicConsume(queueName, true, consumer);

It does not start an endless loop, as is implied in the documentation. Instead, it stops right away. The only way I can have it consume for some time is to replace channel.basicConsume with a loop, as follows:

    DateTime startedAt = new DateTime();
    DateTime stopAt = startedAt.plusSeconds(60);
    long i=0;
    try {
        while (stopAt.compareTo(new DateTime()) > 0) {
            channel.basicConsume(queueName, true, consumer);
            i++;
        }
    }finally {
        System.out.println(new DateTime());
        System.out.println(startedAt);
        System.out.println(stopAt);
        System.out.println(i);
    }

There must be a better way to listen to messages for a while, correct? What am I missing? It stops listening right away.

like image 274
AlexC Avatar asked Dec 24 '22 04:12

AlexC


2 Answers

Are you sure it's stopping? What basicConsume does is register a consumer to listen to a specific queue so there is no need to execute it in a loop. You only execute it once, and the handleDelivery method of the instance of Consumer you pass will be called whenever a message arrives.

The Threads that the rabbitmq library creates should keep the JVM from exiting. In order to exit the program you should actually call connection.close()

Here is a complete receiver example from rabbitmq: https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

It's actually pretty much the same as yours.

like image 139
Nazaret K. Avatar answered Dec 28 '22 07:12

Nazaret K.


i had the same issue. the reason was that i was calling connection.close at the end. however, the basicConsume() method does not block on the current thread, rather on other threads, so the code after it, i.e. the connection.close() is called immediately.

like image 38
manojmo Avatar answered Dec 28 '22 06:12

manojmo