Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring kafka and Kafka Cluster

I've configured 3 kafka's in cluster and I'm trying to use with spring-kafka.

But After I kill the kafka leader I'm not able to send other messages to queue.

I'm setting the spring.kafka.bootstrap-servers property as: "kafka-1:9092;kafka-2:9093,kafka-3:9094" and all of names in my hosts file.

Kafka version 0.10

Some knows how the correct configuration?

Edit

I have tested one thing and happened a strange behavior. When I start the service I send a message to topic( to force the creation)

Code:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
    sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
    return new KafkaSyncListener();
}

So, In this time I did not start the kafka-1 server( just the others) and It happened the exception:

org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

It seems spring-kafka just tries to connect on the first bootstrap server. I'm using spring-kafka 1.3.5.RELEASE and kafka 0.10.1.1

Edit 2

I have done the test that you did. It happens the same when I remove the first docker container (kafka-1) the leader have changed. So, My consumer(spring service) does not able to consume the messages. But when I start the kafka-1 again the service get all messages My consumer ConcurrentKafkaListenerContainerFactory:

{
  key.deserializer=class
  org.apache.kafka.common.serialization.IntegerDeserializer,
  value.deserializer=class
  org.apache.kafka.common.serialization.StringDeserializer,
  max.poll.records=500,
  group.id=mongo-adapter-service,
  ssl.keystore.location=/certs/kafka.keystore.jks,
  bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}
like image 594
Rodolfo Oliveira Avatar asked Jun 11 '18 19:06

Rodolfo Oliveira


1 Answers

You need a comma between server addresses, not a semicolon.

EDIT

I just ran a test with no problems:

spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094

and

@SpringBootApplication
public class So50804678Application {

    public static void main(String[] args) {
        SpringApplication.run(So50804678Application.class, args);
    }

    @KafkaListener(id = "foo", topics = "so50804678")
    public void in(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50804678", 1, (short) 3);
    }

}

and

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

Killed the leader, and

$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678    PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: so50804678   Partition: 0    Leader: 1   Replicas: 0,1,2 Isr: 1,2

and

$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678

Sent a message and it was received by the app; no errors in the log except a WARN:

[Consumer clientId=consumer-1, groupId=foo] Connection to node 0 could not be established. Broker may not be available.

I then restarted the dead server; stopped my app; then added this code...

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
    return args -> {
        while(true) {
            System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
            Thread.sleep(3_000);
        }
    };
}

Again, killing the current leader had no impact; everything recovered ok.

You may need to tweak the listeners/advertised.listeners properties in your server props. Since my brokers are all on local host, I left them to default.

like image 165
Gary Russell Avatar answered Oct 20 '22 09:10

Gary Russell