Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka Default Encoder Not Working

Tags:

apache-kafka

I am using Kafka 0.8 beta, and I am just trying to mess around with sending different objects, serializing them using my own encoder, and sending them to an existing broker configuration. For now I am trying to get just DefaultEncoder working.

I have the broker and everything setup and working for StringEncoder, but I am not able to get any other data type, including just pure byte[], to be sent and received by the broker.

My code for the Producer is:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;


public class ProducerTest {
    public static void main(String[] args) {
        long events = 5;
        Random rnd = new Random();
        rnd.setSeed(new Date().getTime());
        Properties props = new Properties();
        props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
        props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
        props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
        props.setProperty("request.required.acks", "1");
        props.setProperty("producer.type", "async");
        props.setProperty("batch.num.messages", "4");

        ProducerConfig config = new ProducerConfig(props);
        Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
        for (long nEvents = 0; nEvents < events; nEvents++) {
            byte[] a = "Hello".getBytes();
            byte[] b = "There".getBytes();

            KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
            producer.send(data);
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

I used the same SimplePartitioner as in the example given here, and replacing all the byte arrays by Strings and changing the serializer to kafka.serializer.StringEncoder works perfectly.

For reference, SimplePartitioner:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner<String> {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }

}

What am I doing wrong?

like image 898
laughing_man Avatar asked Sep 26 '13 00:09

laughing_man


People also ask

What is the default core API of Kafka?

Kafka APIs In addition to command line tooling for management and administration tasks, Kafka has five core APIs for Java and Scala: The Admin API to manage and inspect topics, brokers, and other Kafka objects. The Producer API to publish (write) a stream of events to one or more Kafka topics.

How do I know if Kafka producer is connected?

Use 'systemctl status kafka' to check the status.

What is ACKS all in Kafka?

Acks (acknowledgments) An ack is an acknowledgment that the producer gets from a Kafka broker to ensure that the message has been successfully committed to that broker. The config acks is the number of acknowledgments the producer needs to receive before considering a successful commit.


1 Answers

The answer is that the partitioning class SimplePartitioner is applicable only for Strings. When I try to run the Producer asynchronously, it creates a separate thread that handles the encoding and partitioning before sending to the broker. This thread hits a roadblock when it realizes that SimplePartitioner works only for Strings, but because it's a separate thread, no Exceptions are thrown, and so the thread just exits without any indication of wrongdoing.

If we change the SimplePartitioner to accept byte[], for instance:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner<byte[]> {
    public SimplePartitioner (VerifiableProperties props) {

    }

    public int partition(byte[] key, int a_numPartitions) {
        int partition = 0;
        return partition;
    }

}

This works perfectly now.

like image 83
laughing_man Avatar answered Oct 20 '22 12:10

laughing_man