Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to get the all messages in a topic from kafka server

I would like to get all the messages from beginning in a topic from server.

Ex:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning

When using the above console command, I would like to able to get all messages in a topic from the beginning but I couldn't consume all the messages in a topic from beginning using java code.

like image 384
gsc0441 Avatar asked Aug 12 '16 20:08

gsc0441


People also ask

How to consume all messages of a Kafka topic using console?

In order to consume all the messages of a Kafka topic using the console consumer, we simply need to pass the --from-beginning option so that the consumer will fetch messages from the earliest offset available: The equivalent command for kafkacat is also shown below.

How do I list all the available topics in Kafka?

Now you can list all the available topics by running the following command: kafka-topics --bootstrap-server localhost:9092 --list Alternatively, you can also use your Apache Zookeeper endpoint. This can be considered legacy as Apache Kafka is deprecating the use of Zookeeper as new versions are being released.

What is the lifecycle of a message in Kafka?

Here's how the lifecycle of a message works when expiry conditions have been enabled: a message is sent to a Kafka cluster by a producer and appended to the end of a topic, consumers process the topic and read the message, the message stays in the topic until the expiration conditions met, after which it is removed.

How to count number of messages in kafkacat?

You can also use Kafkacat to count the no. of messages or events. Let’s say you have created a schema of Kafka Topic “Test1” in the KsqlDB. Next based on whatever value you store in the flag ‘auto.offset.reset’ , accordingly it will consider the messages to be considered. “earliest” will indicate to read all the messages.


4 Answers

You can get all messages using the following command:

cd Users/kv/kafka/bin

./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic topicName --from-beginning --max-messages 100
like image 57
KayV Avatar answered Oct 20 '22 06:10

KayV


The easiest way would be to start a consumer and drain all the messages. Now I don't know how many partitions you have in your topic and whether you already have a an existing consumer group or not, but you have a few options:

Have a look at this API: https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

1) If you already have a consumer in the same consumer group, and still want to start consuming from the beginning, you should use the seek option listed in the API doc and set the offset to 0 for each consumer in the group. This would start consuming from the beginning.

2) Otherwise, you can start a few consumers in a new consumer group & you would not have to worry about seek.

PS: Please remember to provide more details about your setup in the future if you have more questions on Kafka. A lot of things depend on how you have configured your infrastructure & how you would prefer it to be and would thus vary from case to case.

like image 26
Manav Garg Avatar answered Oct 20 '22 06:10

Manav Garg


TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition); 
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
like image 43
gsc0441 Avatar answered Oct 20 '22 05:10

gsc0441


Just change the consumer group

ConsumerConfig.GROUP_ID_CONFIG - to new group id

and set

AUTO_OFFSET_RESET_CONFIG - earliest

sample code-

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
like image 1
Dinuka Hettiarachchi Avatar answered Oct 20 '22 07:10

Dinuka Hettiarachchi