Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache camel,RabbitMQ how to send messages/objects

I hope someone can provide some help on this matter.

I am using camel rabbitmq and for testing purpose I am trying to send a message to the queue, which I'm trying to display in rabbitmq interface and then also read it back.

However I can't get this working.

What I believe works is that I created, in the exchange tab of rabbitmq management interface, a new exchange. In my java code I send the message to that exchange. When the code is executed, I can see a spike in the web interface showing that something has been received but I can't see what has been received. When I try to read, I can't read and get the following errror: < in route: Route(route2)[[From[rabbitmq://192.168.59.103:5672/rt... because of Route route2 has no output processors. You need to add outputs to the route such as to("log:foo").

Can someone provide me a practical example on how to send a message,see it in the web interace and also read it? any tutorial showing this process will be also appreciated.

Thank you

================= SECOND PART

The error I'm getting now is the following:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'rhSearchExchange' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10), null, ""}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 47 more

I have the following settings:

I get this error, I believe I’m doing something wrong with the URI and I have to define some extra parameters that I’m missing My exchange is of direct type My queue is of durable type And my uri is : rabbitmq://192.168.59.105:5672/rhSearchExchange?username=guest&password=guest&routingKey=rhSearchQueue

any input on this?

Thanks

like image 371
cpu2007 Avatar asked Oct 20 '14 09:10

cpu2007


2 Answers

So I was able to figure this out yesterday, I had the same (or at least similar) problems you were having.

The options you have in the RabbitMQ URI must exactly match the options that your exchange was created with. For example, in my configuration, I had an exchange called tasks that was a direct type, was durable, and was not configured to autodelete. Note that the default value for the autodelete option in the rabbitmq camel component is true. Additionally, I wanted to get the messages with the routing key camel. That means my rabbitmq URI needed to look like:

rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel

Additionally, I wanted to read from an existing queue, called task_queue rather than have the rabbitmq camel component declare it's own queue. Therefore, I also needed to add an additional query parameter, so my rabbitmq URI was

rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue

This configuration worked for me. Below, I added some Java code snippets from the code that configures the exchange and queue and sends a message, and my Camel Route configuration.

Exchange and Queue configuration:

rabbitConnFactory = new ConnectionFactory();
rabbitConnFactory.setHost("localhost");
final Connection conn = rabbitConnFactory.newConnection();
final Channel channel = conn.createChannel();

// declare a direct, durable, non autodelete exchange named 'tasks'    
channel.exchangeDeclare("tasks", "direct", true); 
// declare a durable, non exclusive, non autodelete queue named 'task_queue'
channel.queueDeclare("task_queue", true, false, false, null); 
// bind 'task_queue' to the 'tasks' exchange with the routing key 'camel'
channel.queueBind("task_queue", "tasks", "camel"); 

Sending a message:

channel.basicPublish("tasks", "camel", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello, world!".getBytes());

Camel Route:

@Override
public void configure() throws Exception {
    from("rabbitmq:localhost:5672/tasks?username=guest&password=guest&autoDelete=false&routingKey=camel&queue=task_queue")
        .to("mock:result");
}

I hope this helps!

like image 54
Jeff Avatar answered Oct 12 '22 13:10

Jeff


Because this it the top hit on Google for rabbitmq/camel integration I feel the need to add a bit more to the subject. The lack of simple camel examples is astonishing to me.

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Test;

public class CamelTests {
    CamelContext context;
    ProducerTemplate producer;
    ConsumerTemplate consumer;
    Endpoint endpoint;

    @Test
    public void camelRabbitMq() throws Exception {
        context = new DefaultCamelContext();

        context.start();

        endpoint = context.getEndpoint("rabbitmq://192.168.56.11:5672/tasks?username=benchmark&password=benchmark&autoDelete=false&routingKey=camel&queue=task_queue");

        producer = context.createProducerTemplate();

        producer.setDefaultEndpoint(endpoint);
        producer.sendBody("one");
        producer.sendBody("two");
        producer.sendBody("three");
        producer.sendBody("four");
        producer.sendBody("done");

        consumer = context.createConsumerTemplate();
        String body = null;
        while (!"done".equals(body)) {
            Exchange receive = consumer.receive(endpoint);
            body = receive.getIn().getBody(String.class);
            System.out.println(body);
        }

        context.stop();

    }

}
like image 23
slf Avatar answered Oct 12 '22 14:10

slf