Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delay in Consumer consuming messages in Apache Kafka

I am using Kafka 0.8.0 and trying to achieve the below mentioned scenario.

JCA API (Acts as a producer and sends data to)-----> Consumer------> HBase

I am sending each message to consumer as soon as I fetch the data using JCA Client. For instance, as soon as producer sends message no.1 , I want to fetch the same from consumer and 'put' in HBase. But my consumer starts fetching the messages after some random n messages . I want to put the producer and consumer in sync so that both of them start working together.

I have used:

1 broker

1 single topic

1 single producer and high level Consumer

Can anyone suggest what do i need to do to achieve the same?

EDITED:

Adding some relevant code snippet.

Consumer.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

producer.java

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

KafkaProperties.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

This is how the consumer is behaving for the first 10 messages it does not sysout that message received by consumer but from the 11th message onwards it starts functioning correctly.

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

EDITED: adding the listener function where producer is sending messages to consumer. And I am using the default producer config did not overwrite it

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}
like image 812
Ankita Avatar asked Feb 12 '14 11:02

Ankita


People also ask

Why is my Kafka consumer lagging?

The reasons for that may vary from inefficient processing to issues with the Data Store, network issues, and many more. Basically, anything that can slow down consuming data from the Kafka Broker will cause the Consumer Lag making the processing fall behind in processing the data.

How do you introduce a delay in Kafka consumer?

To add the delay, all we need to do is use Thread. sleep(<time in ms>). This makes the thread, which runs the Kafka listener, to sleep and stop consuming any more messages till the time passes. The message can then be pushed back into the main processing queue.

How do you get rid of consumer lag in Kafka?

In order to "fast forward" the offset of consumer group, means to clear the LAG, you need to create new consumer that will join the same group. In parallel you can run the command to see the lags like you described, and you will see the lag wiped.

How does Kafka resolve consumer lag?

How about increase the partitions of topic and also increase consumers up to partitions. Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible.


1 Answers

  1. Try to add props.put("request.required.acks", "1") to producer configuration. By default producer doesn't wait for acks and message delivery is not guaranteed. So, if you start broker just before your test, producer may start to send messages before broker is fully initialized and first several messages may be lost.

  2. Try to add props.put("auto.offset.reset", "smallest") to consumer configuration. It is equal to --from-beginning option of kafka-console-consumer.sh. If your consumer starts later than producer and there is no offset data saved in Zookeeper, then by default it will start consuming only new messages (see Consumer configs in docs).

like image 63
Dmitry Avatar answered Sep 28 '22 05:09

Dmitry