Here are my code snippets.
MQConfiguration
class for configuration
@Configuration
public class MQConfiguration {
@Bean
public Receiver receiver() {
return new Receiver();
}
}
Receiver
class for dealing with receiving messages
@RabbitListener(queues = "testMQ")
public class Receiver {
@RabbitHandler
public void receive(Message msg){
System.out.println(msg.toString());
}
}
And here is the JSON message I sent to the RabbitMQ
{
"id": 1,
"name": "My Name",
"description": "This is description about me"
}
However I got following error message when I ran my application.
2017-02-28 17:16:35.931 WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
... 10 common frames omitted
So what should I do if all I want is to print the JSON message in receive()
method? I'd really appreciate that anyone can shed a light on this. :)
The Producer is an application that sends messages to the RabbitMQ broker and the Consumer is an application that reads messages from the RabbitMQ broker. In this tutorial, we will implement below Spring Boot RabbitMQ flow: Docker - Install and set up RabbitMQ locally as a Docker container using Docker.
2. REST Service Writing a JSON REST service in Spring Boot is simple, as that's its default opinion when Jackson is on the classpath:
By default, the message sent by RabbitMQ is converted to bytecode. Here's how to send JSON data. The easiest way to send JSON data is to use JSON tool classes such as ObjectMapper to convert the object to JSON format, and then send it. As follows:
Writing a JSON REST service in Spring Boot is simple, as that's its default opinion when Jackson is on the classpath: @RestController @RequestMapping ("/students") public class StudentController { @Autowired private StudentService service; @GetMapping ("/ {id}") public Student read(@PathVariable String id) { return service.find (id); } ...
In order to send JSON to RabbitMQ and consume it by Spring Boot we need to set content_type
.
Let me describe with an example where I had a Python Producer and Java consumer (I was sending the JSON from Python to RabbitMQ and Spring Boot Java was supposed to receive the JSON task).
There are two solutions:
Solution 1: Sending as JSON string and convert it manually using Jakson or GSON
You need to set the content_type="text/plain" and convert the JSON to a string.Then in the Spring side, use a fuction with a string as the input as the listener and manually convert the object.
RabbitHandler:
@RabbitHandler
public void receive(String inputString) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class);
System.out.println("String instance " + theResult.toString() +
" [x] Received");
}
SimStatusReport Object:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SimStatusReport {
private String id;
private int t;
}
Here is my Python Code:
import pika
import json
import uuid
connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channelResult = connectionResult.channel()
routing_key_result = 'sim_results'
channelResult.queue_declare(queue=routing_key_result, durable=True)
def publish_result(sim_status):
message =json.dumps(sim_status)
channelResult.basic_publish(exchange='',
routing_key=routing_key_result,
body=message,
properties=pika.BasicProperties(
content_type="text/plain",
content_encoding= 'UTF-8',
delivery_mode=2, # make message persistent
))
print("Sent ", message)
newsim_status = {'id': str(uuid.uuid4()), 't': 0}
publish_result(newsim_status)
Solution 2: Sending JSON string and let the Jackson2JsonMessageConverter do the conversion for you automatically.
You need to set the content_type="application/json". Then you need to add the appropriate header to __TypeId__
in the header of the RabbitMQ request. You need to include the exact name space of the object so the Jackson undestand the the conversion.
Here is my example using Python (just the publish_result fuction):
def publish_result(sim_status):
message =json.dumps(sim_status)
channelResult.basic_publish(exchange='',
routing_key=routing_key_result,
body=message,
properties=pika.BasicProperties(
content_type="application/json"
headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'},
content_encoding= 'UTF-8',
delivery_mode=2, # make message persistent
))
print("Sent ", message)
Then you need to configure the Java to use Jackson2JsonMessageConverter:
@Configuration
public class RabbitConfiguration {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Here would be your listener:
@RabbitListener(queues = "sim_results")
public class TaskReceiver {
@RabbitHandler
public void receive(SimStatusReport in) {
System.out.println("Object instance " + in +
" [x] Received");
}
}
Note: Make sure all you objects has setter and getters for all properties and all argument constructor. I use @Data, @NoArgsConstructor and @AllArgsConstructor from lombok to automatically generate it
If you use Spring Boot, you just need to configure:
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
Otherwise you have to configure:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}
http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With