Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

flink kafka consumer groupId not working

I am using kafka with flink. In a simple program, I used flinks FlinkKafkaConsumer09, assigned the group id to it.

According to Kafka's behavior, when I run 2 consumers on the same topic with same group.Id, it should work like a message queue. I think it's supposed to work like: If 2 messages sent to Kafka, each or one of the flink program would process the 2 messages totally twice(let's say 2 lines of output in total).

But the actual result is that, each program would receive 2 pieces of the messages.

I have tried to use consumer client that came with the kafka server download. It worked in the documented way(2 messages processed).
I tried to use 2 kafka consumers in the same Main function of a flink programe. 4 messages processed totally.
I also tried to run 2 instances of flink, and assigned each one of them the same program of kafka consumer. 4 messages.

Any ideas? This is the output I expect:

1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66 

Here's the wrong output i always get:

1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66 

And here is the segment of code:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Kafka and Flink1 says: " + value;
        }
    }).print();


    env.execute();
}

I have tried to run it twice and also in the other way: create 2 datastreams and env.execute() for each one in the Main function.

like image 933
PleaseLetMeGo Avatar asked Jul 28 '16 14:07

PleaseLetMeGo


1 Answers

There was a quite similar question on the Flink user mailing list today, but I can't find the link to post it here. So here a part of the answer:

"Internally, the Flink Kafka connectors don’t use the consumer group management functionality because they are using lower-level APIs (SimpleConsumer in 0.8, and KafkaConsumer#assign(…) in 0.9) on each parallel instance for more control on individual partition consumption. So, essentially, the “group.id” setting in the Flink Kafka connector is only used for committing offsets back to ZK / Kafka brokers."

Maybe that clarifies things for you.

Also, there is a blog post about working with Flink and Kafka that may help you (https://data-artisans.com/blog/kafka-flink-a-practical-how-to).

like image 128
Claudi Avatar answered Oct 19 '22 04:10

Claudi