Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Send an object using RabbitMQ

I Understand that this question duplicates question at using rabbitmq to send a message not string but struct

if to do this using the first way

first way

I have the following trace:

java.io.EOFException
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298)
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78)
at com.mdnaRabbit.worker.App.main(App.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

I've checked and shure that message is transformd to bytes absolutely well in sender class, but the consumer can't receive it.

here is my producer class:

package com.mdnaRabbit.newt;

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.commons.lang.SerializationUtils;
import com.mdnaRabbit.worker.data.Data;

public class App {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main( String[] argv) throws IOException{

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        int i = 0;

        do {
            Data message = getMessage();
            byte [] byteMessage = message.getBytes();
            //System.out.println(byteMessage);
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage);
            System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody());
            i++;
        } while (i<15);

        channel.close();
        connection.close();
    }

    private static Data getMessage(){
        Data data = new Data();
        data.setHeader("header");
        data.setDomainId("abc.com");
        data.setReceiver("me");
        data.setSender("he");
        data.setBody("body");
        return data;
    }

    private static String joinStrings(String[] strings, String delimiter){
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

here is my consumer class:

    package com.mdnaRabbit.worker;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;

public class App {

    private static final String TASK_QUEUE_NAME = "task_queue";
    private static int i = 0;
    public static void main( String[] argv )
            throws IOException,
            InterruptedException{

        ExecutorService threader = Executors.newFixedThreadPool(20);
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection(threader);
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(20);

        final QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

        try {

            while (true) {

                        try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            Data message = Data.fromBytes(delivery.getBody());
                            //Data message = (Data) SerializationUtils.deserialize(delivery.getBody());

                            System.out.println(" [" + (i++) +"] Received" + message.getBody());

                            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        }catch (Exception e){
                        }
                    }
        } catch (Exception e){
            e.printStackTrace();
        }
        channel.close();
        connection.close();
    }

}

here is my Data class:

package com.mdnaRabbit.worker.data;

import java.io.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Data implements Serializable{

    public String header;
    public String body;
    public String domainId;
    public String sender;
    public String receiver;

    public void setHeader(String head){
        this.header = head;
    }

    public String getHeader(){
        return header;
    }

    public void setBody(String body){
        this.body = body;
    }

    public String getBody(){
        return body;
    }

    public void setDomainId(String domainId){
        this.domainId = domainId;
    }

    public String getDomainId(){
        return domainId;
    }

    public void setSender(String sender){
        this.sender = sender;
    }

    public String getSender(){
        return sender;
    }

    public String getReceiver(){
        return receiver;
    }

    public void setReceiver(String receiver){
        this.receiver = receiver;
    }


    public byte[] getBytes() {
        byte[]bytes;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try{
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(this);
            oos.flush();
            oos.reset();
            bytes = baos.toByteArray();
            oos.close();
            baos.close();
        } catch(IOException e){
            bytes = new byte[] {};
            Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e);
        }
        return bytes;
    }

    public static Data fromBytes(byte[] body) {
        Data obj = null;
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(body);
            ObjectInputStream ois = new ObjectInputStream(bis);
            obj = (Data) ois.readObject();
            ois.close();
            bis.close();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        return obj;
    }
}

I always seems that consumer receives messages, because when I'm not trying to transform it into the object and just write System.out.println(delivery.getBody) it shows bytes

like image 361
Nikitin Mikhail Avatar asked Mar 20 '13 12:03

Nikitin Mikhail


People also ask

Can RabbitMQ push messages?

Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages.

Does RabbitMQ push or pull?

RabbitMQ uses a push-based model with a smart producer, which means the producer decides when to push data. A prefetch limit is defined on the consumer to stop the producer from overwhelming consumers. Such a push-based approach is suited for low latency messaging.

Is RabbitMQ a message bus?

RabbitMQ is a messaging broker - an intermediary for messaging. It gives your applications a common platform to send and receive messages, and your messages a safe place to live until received.


1 Answers

It looks like the byte array you receive is empty. This happens because of this:

    } catch(IOException e){
        bytes = new byte[] {};
    }

When an exception is produced, the code doesn't warn you that something is broken and just sends an empty array instead. You should at least log the error.

The exception is being produced probably because you are trying to serialize a class that is not serializable. To make a class serializable you have to add "implements Serializable" to its declaration:

public class Data implements Serializable {
like image 105
Joni Avatar answered Sep 22 '22 08:09

Joni