I am using KafkaConsumer 0.10 Java api. I want to consume from a specific partition and specific offset. I looked up and found that there is a seek method but its throwing an exception. Anyone had a similar use case or solution ?
Code:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); consumer.seek(new TopicPartition("mytopic", 1), 4);
Exception
java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) at xx.xxx.xxx.Test.main(Test.java:182)
It is also possible for the consumer to manually assign specific partitions (similar to the older "simple" consumer) using assign(Collection) . In this case, dynamic partition assignment and consumer group coordination will be disabled.
Kafka Partitioning Partitioning takes the single topic log and breaks it into multiple logs, each of which can live on a separate node in the Kafka cluster. This way, the work of storing messages, writing new messages, and processing existing messages can be split among many nodes in the cluster.
The poll() method is the function a Kafka consumer calls to retrieve records from a given topic. When calling the poll() method, consumers provide a timeout argument. This is the maximum amount of time to wait for records to process before returning.
Before you can seek()
you first need to subscribe()
to a topic or assign()
partition of a topic to the consumer. Also keep in mind, that subscribe()
and assign()
are lazy -- thus, you also need to do a "dummy call" to poll()
before you can use seek()
.
Note: as of Kafka 2.0, the new
poll(Duration timeout)
is async and it's not guaranteed that you have a complete assignment whenpoll
returns. Thus, you might need to check your assignment before usingseek()
and alsopoll
again to refresh the assignment. (Cf. KIP-266 for details)
If you use subscribe()
, you use group management: thus, you can start multiple consumers using the same group.id
and all partitions of the topic will be assigned evenly over all consumers within the group automatically (each partition will get assigned to a single consumer in the group).
If you want to read specific partitions, you need to use manual assignment via assign()
. This allows you to do any assignment you want.
Btw: KafkaConsumer
has a very long an detailed class JavaDoc including examples. It's worth to read it.
If you do not want to use poll() and retrieve map records, and change the offset itself. Kafka version 0.11 Try this:
... props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); coordinatorField.setAccessible(true); ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings consumer.seekToBeginning(partitions); //or other seek
Poll for coordinator events. This ensures that the coordinator is known and that the consumer has joined the group (if it is using group management). This also handles periodic offset commits if they are enabled.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With