Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to deal with JSON message with spring-rabbit in spring boot application?

Here are my code snippets.

  • MQConfiguration class for configuration

    @Configuration
    public class MQConfiguration {
        @Bean
        public Receiver receiver() {
            return new Receiver();
        }
    }
    
  • Receiver class for dealing with receiving messages

    @RabbitListener(queues = "testMQ")
    public class Receiver {
    
        @RabbitHandler
        public void receive(Message msg){
            System.out.println(msg.toString());
        }
    }
    
  • And here is the JSON message I sent to the RabbitMQ

    {
        "id": 1,
        "name": "My Name",
        "description": "This is description about me"
    }
    

However I got following error message when I ran my application.

2017-02-28 17:16:35.931  WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted

So what should I do if all I want is to print the JSON message in receive() method? I'd really appreciate that anyone can shed a light on this. :)

like image 477
kenshinji Avatar asked Feb 28 '17 09:02

kenshinji


People also ask

How to implement RabbitMQ flow in Spring Boot?

The Producer is an application that sends messages to the RabbitMQ broker and the Consumer is an application that reads messages from the RabbitMQ broker. In this tutorial, we will implement below Spring Boot RabbitMQ flow: Docker - Install and set up RabbitMQ locally as a Docker container using Docker.

Can we use JSON in Spring Boot?

2. REST Service Writing a JSON REST service in Spring Boot is simple, as that's its default opinion when Jackson is on the classpath:

How do I send JSON data in RabbitMQ?

By default, the message sent by RabbitMQ is converted to bytecode. Here's how to send JSON data. The easiest way to send JSON data is to use JSON tool classes such as ObjectMapper to convert the object to JSON format, and then send it. As follows:

How do I write a JSON REST service in Spring Boot?

Writing a JSON REST service in Spring Boot is simple, as that's its default opinion when Jackson is on the classpath: @RestController @RequestMapping ("/students") public class StudentController { @Autowired private StudentService service; @GetMapping ("/ {id}") public Student read(@PathVariable String id) { return service.find (id); } ...


2 Answers

In order to send JSON to RabbitMQ and consume it by Spring Boot we need to set content_type.

Let me describe with an example where I had a Python Producer and Java consumer (I was sending the JSON from Python to RabbitMQ and Spring Boot Java was supposed to receive the JSON task).

There are two solutions:

Solution 1: Sending as JSON string and convert it manually using Jakson or GSON

You need to set the content_type="text/plain" and convert the JSON to a string.Then in the Spring side, use a fuction with a string as the input as the listener and manually convert the object.

RabbitHandler:

@RabbitHandler
public void receive(String inputString) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();
    SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class);

    System.out.println("String instance "  + theResult.toString() +
            " [x] Received");
}

SimStatusReport Object:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SimStatusReport {
    private String id;
    private int t;
}

Here is my Python Code:

import pika
import json
import uuid


connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channelResult = connectionResult.channel()
routing_key_result = 'sim_results'
channelResult.queue_declare(queue=routing_key_result, durable=True)

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="text/plain",
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)


newsim_status = {'id': str(uuid.uuid4()), 't': 0}
publish_result(newsim_status)

Solution 2: Sending JSON string and let the Jackson2JsonMessageConverter do the conversion for you automatically.

You need to set the content_type="application/json". Then you need to add the appropriate header to __TypeId__ in the header of the RabbitMQ request. You need to include the exact name space of the object so the Jackson undestand the the conversion.

Here is my example using Python (just the publish_result fuction):

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="application/json"
                                    headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'},
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)

Then you need to configure the Java to use Jackson2JsonMessageConverter:

@Configuration
    public class RabbitConfiguration {
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }

Here would be your listener:

@RabbitListener(queues = "sim_results")
public class TaskReceiver {
    @RabbitHandler
    public void receive(SimStatusReport in) {
        System.out.println("Object instance "  + in +
                " [x] Received");
    }
}

Note: Make sure all you objects has setter and getters for all properties and all argument constructor. I use @Data, @NoArgsConstructor and @AllArgsConstructor from lombok to automatically generate it

like image 28
Amir Avatar answered Oct 07 '22 11:10

Amir


If you use Spring Boot, you just need to configure:

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

Otherwise you have to configure:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
    return factory;
}

http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven

like image 82
Artem Bilan Avatar answered Oct 07 '22 09:10

Artem Bilan