Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to encode/decode Kafka messages using Avro binary encoder?

I'm trying to use Avro for messages being read from/written to Kafka. Does anyone have an example of using the Avro binary encoder to encode/decode data that will be put on a message queue?

I need the Avro part more than the Kafka part. Or, perhaps I should look at a different solution? Basically, I'm trying to find a more efficient solution to JSON with regards to space. Avro was just mentioned since it can be more compact than JSON.

like image 812
blockcipher Avatar asked Nov 28 '11 15:11

blockcipher


1 Answers

This is a basic example. I have not tried it with multiple partitions/topics.

//Sample producer code

import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.*; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Properties;   public class ProducerTest {      void producer(Schema schema) throws IOException {          Properties props = new Properties();         props.put("metadata.broker.list", "0:9092");         props.put("serializer.class", "kafka.serializer.DefaultEncoder");         props.put("request.required.acks", "1");         ProducerConfig config = new ProducerConfig(props);         Producer<String, byte[]> producer = new Producer<String, byte[]>(config);         GenericRecord payload1 = new GenericData.Record(schema);         //Step2 : Put data in that genericrecord object         payload1.put("desc", "'testdata'");         //payload1.put("name", "अasa");         payload1.put("name", "dbevent1");         payload1.put("id", 111);         System.out.println("Original Message : "+ payload1);         //Step3 : Serialize the object to a bytearray         DatumWriter<GenericRecord>writer = new SpecificDatumWriter<GenericRecord>(schema);         ByteArrayOutputStream out = new ByteArrayOutputStream();         BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);         writer.write(payload1, encoder);         encoder.flush();         out.close();          byte[] serializedBytes = out.toByteArray();         System.out.println("Sending message in bytes : " + serializedBytes);         //String serializedHex = Hex.encodeHexString(serializedBytes);         //System.out.println("Serialized Hex String : " + serializedHex);         KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("page_views", serializedBytes);         producer.send(message);         producer.close();      }       public static void main(String[] args) throws IOException, DecoderException {         ProducerTest test = new ProducerTest();         Schema schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));         test.producer(schema);     } } 

//Sample consumer code

Part 1 : Consumer group code : as you can have more than multiple consumers for multiple partitions/ topics.

import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;  import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;  /**  * Created by  on 9/1/15.  */ public class ConsumerGroupExample {    private final ConsumerConnector consumer;    private final String topic;    private ExecutorService executor;     public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic){       consumer = kafka.consumer.Consumer.createJavaConsumerConnector(               createConsumerConfig(a_zookeeper, a_groupId));       this.topic = a_topic;    }     private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){        Properties props = new Properties();        props.put("zookeeper.connect", a_zookeeper);        props.put("group.id", a_groupId);        props.put("zookeeper.session.timeout.ms", "400");        props.put("zookeeper.sync.time.ms", "200");        props.put("auto.commit.interval.ms", "1000");         return new ConsumerConfig(props);    }      public void shutdown(){          if (consumer!=null) consumer.shutdown();         if (executor!=null) executor.shutdown();         System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");         try{           if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){            }         }catch(InterruptedException e){             System.out.println("Interrupted");         }      }       public void run(int a_numThreads){         //Make a map of topic as key and no. of threads for that topic         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();         topicCountMap.put(topic, new Integer(a_numThreads));         //Create message streams for each topic         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);          //initialize thread pool         executor = Executors.newFixedThreadPool(a_numThreads);         //start consuming from thread         int threadNumber = 0;         for (final KafkaStream stream : streams) {             executor.submit(new ConsumerTest(stream, threadNumber));             threadNumber++;         }     }     public static void main(String[] args) {         String zooKeeper = args[0];         String groupId = args[1];         String topic = args[2];         int threads = Integer.parseInt(args[3]);          ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);         example.run(threads);          try {             Thread.sleep(10000);         } catch (InterruptedException ie) {          }         example.shutdown();     }   } 

Part 2 : Indiviual consumer that actually consumes the messages.

import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import org.apache.commons.codec.binary.Hex;  import java.io.File; import java.io.IOException;  public class ConsumerTest implements Runnable{      private KafkaStream m_stream;     private int m_threadNumber;      public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {         m_threadNumber = a_threadNumber;         m_stream = a_stream;     }      public void run(){         ConsumerIterator<byte[], byte[]>it = m_stream.iterator();         while(it.hasNext())         {             try {                 //System.out.println("Encoded Message received : " + message_received);                 //byte[] input = Hex.decodeHex(it.next().message().toString().toCharArray());                 //System.out.println("Deserializied Byte array : " + input);                 byte[] received_message = it.next().message();                 System.out.println(received_message);                 Schema schema = null;                 schema = new Schema.Parser().parse(new File("src/test_schema.avsc"));                 DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);                 Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null);                 GenericRecord payload2 = null;                 payload2 = reader.read(null, decoder);                 System.out.println("Message received : " + payload2);             }catch (Exception e) {                 e.printStackTrace();                 System.out.println(e);             }         }      }   } 

Test AVRO schema :

{     "namespace": "xyz.test",      "type": "record",      "name": "payload",      "fields":[          {             "name": "name", "type": "string"          },          {             "name": "id",  "type": ["int", "null"]          },          {             "name": "desc", "type": ["string", "null"]          }      ] } 

Important things to note are :

  1. Youll need the standard kafka and avro jars to run this code out of the box.

  2. Is very important props.put("serializer.class", "kafka.serializer.DefaultEncoder"); Dont use stringEncoder as that wont work if you are sending a byte array as message.

  3. You can convert the byte[] to a hex string and send that and on the consumer reconvert hex string to byte[] and then to the original message.

  4. Run the zookeeper and the broker as mentioned here :- http://kafka.apache.org/documentation.html#quickstart and create a topic called "page_views" or whatever you want.

  5. Run the ProducerTest.java and then the ConsumerGroupExample.java and see the avro data being produced and consumed.

like image 168
ramu Avatar answered Sep 18 '22 11:09

ramu