Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

List Kafka Topics via Spring-Kafka

We would like to list all Kafka topics via spring-kafka to get results similar to the kafka command:

bin/kafka-topics.sh --list --zookeeper localhost:2181

When running the getTopics() method in the service below, we get org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

Configuration:

@EnableKafka
@Configuration
public class KafkaConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
}

Service:

@Service
public class TopicServiceKafkaImpl implements TopicService {
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Override
    public Set<String> getTopics() {
        try (Consumer<String, String> consumer = 
            consumerFactory.createConsumer()) {
            Map<String, List<PartitionInfo>> map = consumer.listTopics();
            return map.keySet();
    }
}

Kafka is up and running and we can send messages from our app to a topic succesfully.

like image 254
Paul Croarkin Avatar asked Nov 28 '18 20:11

Paul Croarkin


2 Answers

You can list topics like this using Admin Client

    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    AdminClient adminClient = AdminClient.create(properties);

    ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
    listTopicsOptions.listInternal(true);

    System.out.println("topics:" + adminClient.listTopics(listTopicsOptions).names().get());
like image 154
Abdullah Ahçı Avatar answered Oct 03 '22 22:10

Abdullah Ahçı


You are connecting to Zookeeper (2181) instead of Kafka (9092 by default).

The Java kafka clients no longer talk directly to ZK.

like image 37
Gary Russell Avatar answered Oct 03 '22 22:10

Gary Russell