Need to fetch messages from a Kafka topic, from a particular offset
Stuck cause of IllegalStateException exception at assign()
If I do not use assign() , then the consumer does not perform seek, as that being a Lazy operation
Actual purpose: Need to iterate messages at topic from a pre-decided offset till end. This pre-decided offset is calculated at markOffset()
static void fetchMessagesFromMarkedOffset() {
Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
consumer.assign(set); // <---- Exception at this place
map.forEach((k,v) -> {
consumer.seek(k, v-3);
});
ConsumerRecords<Long, String> consumerRecords = consumer.poll(100);
consumerRecords.forEach(record -> {
System.out.println("Record Key " + record.key());
System.out.println("Record value " + record.value());
System.out.println("Record partition " + record.partition());
System.out.println("Record offset " + record.offset());
});
consumer.close();
}
Rest of concerned code involved
public static Set<TopicPartition> set;
public static Map<TopicPartition, Long> map;
static void markOffset() {
Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
consumer.poll(100);
set = consumer.assignment();
map = consumer.endOffsets(set);
System.out.println("Topic Partitions: " + set);
System.out.println("End Offsets: " + map);
}
Consumer Creation
private Consumer createConsumer(String topicName) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "capacity-service-application");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topicName));
return consumer;
}
Exception
Exception in thread "main" java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
at org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:104)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromUser(SubscriptionState.java:157)
at org.apache.kafka.clients.consumer.KafkaConsumer.assign(KafkaConsumer.java:1064)
at com.gaurav.kafka.App.fetchMessagesFromMarkedOffset(App.java:44)
at com.gaurav.kafka.App.main(App.java:30)
You can't mixed manual and automatic partition assignment.
You should use KafkaConsumer::subscribe or KafkaConsumer::assign but not both.
If after calling KafkaConsumer::subscribe you want to switch to manual approach you should first call KafkaConsumer::unsubscribe.
According to https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Note that it isn't possible to mix manual partition assignment (i.e. using assign) with dynamic partition assignment through topic subscription (i.e. using subscribe).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With