We would like to list all Kafka topics via spring-kafka to get results similar to the kafka command:
bin/kafka-topics.sh --list --zookeeper localhost:2181
When running the getTopics() method in the service below, we get org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Configuration:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
Service:
@Service
public class TopicServiceKafkaImpl implements TopicService {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Override
public Set<String> getTopics() {
try (Consumer<String, String> consumer =
consumerFactory.createConsumer()) {
Map<String, List<PartitionInfo>> map = consumer.listTopics();
return map.keySet();
}
}
Kafka is up and running and we can send messages from our app to a topic succesfully.
You can list topics like this using Admin Client
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());
You are connecting to Zookeeper (2181) instead of Kafka (9092 by default).
The Java kafka clients no longer talk directly to ZK.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With