Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avro decoding gives java.io.EOFException

I use Apache avro schema with Kafka 0.0.8V. I Use same schema at producer/consumer ends. There is NO ANY Changes in the schema. But i get some exception at the consumer, when i try to consume the messages. Why i get this error?

Producer

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
    BinaryEncoder encoder = null;
    ByteArrayOutputStream out = null;
    try {
        DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload, encoder);
        encoder.flush();

        byte[] serializedBytes = out.toByteArray();

        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);

            producer.send(message);
        }

Consumer

public void run() {
        try {
            ConsumerIterator<byte[], byte[]> itr = stream.iterator();
            while (itr.hasNext()) {

                byte[] data = itr.next().message();

                Schema schema = new Schema.Parser()
                        .parse(new File("/Users/xx/avro_schemas/file.avsc"));

                DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

                GenericRecord payload = reader.read(null, decoder);
                System.out.println("Message received --: " + payload);

But I get following exception when the reader try to read message from the decoder.;

java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
    at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Consumer properties

enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zookeeper.xx.com\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kafka.xx.com\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144
like image 626
Ratha Avatar asked Apr 18 '16 03:04

Ratha


1 Answers

The problem is produced by your AVRO producer.

In the sendFile() method, you are not flushing encoder, and not closing the ByteArrayOutputStream(), causing the EOFException.

Here you have an example of a generic serialization class:

public class TestSerializer<T> {



    final private Class<T> avroType;

    public TestSerializer(Class<T> avroType) {
        this.avroType = avroType;
    }

    public byte[] serialize(T object)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        DatumWriter<T> writer = new SpecificDatumWriter<T>(avroType);
        try
        {
            writer.write(object, encoder);
            out.close();
        } catch (IOException e)
        {
            throw new RuntimeException(e);
        } finally
        {
            //Here is the flushing and closing
            try
            {
                if (encoder != null)
                {
                    encoder.flush();
                }
                if (out != null)
                {
                    out.close();
                }
            } catch (IOException e)
            {
                throw new RuntimeException(e);
            }
        }

        return out.toByteArray();

    }

}
like image 153
Jose F. Gomez Avatar answered Nov 20 '22 07:11

Jose F. Gomez