Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deserialize Kafka AVRO messages using Apache Beam

The main goal is the aggregate two Kafka topics, one compacted slow moving data and the other fast moving data which is received every second.

I have been able to consume messages in simple scenarios such as a KV (Long,String) using something like:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
  
PCollection<String> output = input.apply(Values.<String>create());

But this doesn’t seem to be the approach when you need to deserialize from AVRO. I have a KV(STRING, AVRO) which I need to consume.

I attempted generating the Java Classes from the AVRO schema and then including them in the “apply” for example:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

But this didn’t seem to be the correct approach.

Is there any documentation/examples anyone could point me to, so I could get an understanding as to how you would work with Kafka AVRO and Beam?

I have updated my code:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

But i now get the following error

incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

I must be importing the incorrect serializers?

like image 384
Chimmy Avatar asked Nov 01 '25 07:11

Chimmy


2 Answers

I have faced the same issue. Found the solution in this mail-archives. http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

In your case, you need to defined your own Deserializer<MyClass>, which can extend from AbstractKafkaAvroDeserializer like as follows.

public class MyClassKafkaAvroDeserializer extends
  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {
  
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public MyClass deserialize(String s, byte[] bytes) {
      return (MyClass) this.deserialize(bytes);
  }

  @Override
  public void close() {} }

Then specify your KafkaAvroDeserializer as ValueDeserializer.

p.apply(KafkaIO.<Long, MyClass>read()
 .withKeyDeserializer(LongDeserializer.class)
 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
like image 74
Yohei Onishi Avatar answered Nov 03 '25 22:11

Yohei Onishi


You can use KafkaAvroDeserializer as following:

PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))

Where MyClass is the POJO class generated Avro Schema.

Make sure your POJO class has annotation AvroCoder as in below example :

@DefaultCoder(AvroCoder.class)
   public class MyClass{
      String name;
      String age;

      MyClass(){}
      MyClass(String n, String a) {
         this.name= n;
         this.age= a;
      }
  }
like image 43
Nishu Tayal Avatar answered Nov 03 '25 22:11

Nishu Tayal