I would like to get all the messages from beginning in a topic from server.
Ex:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
When using the above console command, I would like to able to get all messages in a topic from the beginning but I couldn't consume all the messages in a topic from beginning using java code.
In order to consume all the messages of a Kafka topic using the console consumer, we simply need to pass the --from-beginning option so that the consumer will fetch messages from the earliest offset available: The equivalent command for kafkacat is also shown below.
Now you can list all the available topics by running the following command: kafka-topics --bootstrap-server localhost:9092 --list Alternatively, you can also use your Apache Zookeeper endpoint. This can be considered legacy as Apache Kafka is deprecating the use of Zookeeper as new versions are being released.
Here's how the lifecycle of a message works when expiry conditions have been enabled: a message is sent to a Kafka cluster by a producer and appended to the end of a topic, consumers process the topic and read the message, the message stays in the topic until the expiration conditions met, after which it is removed.
You can also use Kafkacat to count the no. of messages or events. Let’s say you have created a schema of Kafka Topic “Test1” in the KsqlDB. Next based on whatever value you store in the flag ‘auto.offset.reset’ , accordingly it will consider the messages to be considered. “earliest” will indicate to read all the messages.
You can get all messages using the following command:
cd Users/kv/kafka/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic topicName --from-beginning --max-messages 100
The easiest way would be to start a consumer and drain all the messages. Now I don't know how many partitions you have in your topic and whether you already have a an existing consumer group or not, but you have a few options:
Have a look at this API: https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
1) If you already have a consumer in the same consumer group, and still want to start consuming from the beginning, you should use the seek
option listed in the API doc and set the offset to 0 for each consumer in the group. This would start consuming from the beginning.
2) Otherwise, you can start a few consumers in a new consumer group & you would not have to worry about seek.
PS: Please remember to provide more details about your setup in the future if you have more questions on Kafka. A lot of things depend on how you have configured your infrastructure & how you would prefer it to be and would thus vary from case to case.
TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> partitions = Arrays.asList(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
Just change the consumer group
ConsumerConfig.GROUP_ID_CONFIG - to new group id
and set
AUTO_OFFSET_RESET_CONFIG - earliest
sample code-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "newID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With