I've configured 3 kafka's in cluster and I'm trying to use with spring-kafka.
But After I kill the kafka leader I'm not able to send other messages to queue.
I'm setting the spring.kafka.bootstrap-servers property as: "kafka-1:9092;kafka-2:9093,kafka-3:9094" and all of names in my hosts file.
Kafka version 0.10
Some knows how the correct configuration?
Edit
I have tested one thing and happened a strange behavior. When I start the service I send a message to topic( to force the creation)
Code:
@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
return new KafkaSyncListener();
}
So, In this time I did not start the kafka-1 server( just the others) and It happened the exception:
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
It seems spring-kafka just tries to connect on the first bootstrap server. I'm using spring-kafka 1.3.5.RELEASE and kafka 0.10.1.1
Edit 2
I have done the test that you did. It happens the same when I remove the first docker container (kafka-1) the leader have changed. So, My consumer(spring service) does not able to consume the messages. But when I start the kafka-1 again the service get all messages My consumer ConcurrentKafkaListenerContainerFactory:
{
key.deserializer=class
org.apache.kafka.common.serialization.IntegerDeserializer,
value.deserializer=class
org.apache.kafka.common.serialization.StringDeserializer,
max.poll.records=500,
group.id=mongo-adapter-service,
ssl.keystore.location=/certs/kafka.keystore.jks,
bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}
You need a comma between server addresses, not a semicolon.
EDIT
I just ran a test with no problems:
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
and
@SpringBootApplication
public class So50804678Application {
public static void main(String[] args) {
SpringApplication.run(So50804678Application.class, args);
}
@KafkaListener(id = "foo", topics = "so50804678")
public void in(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return new NewTopic("so50804678", 1, (short) 3);
}
}
and
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: so50804678 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Killed the leader, and
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: so50804678 Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
and
$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678
Sent a message and it was received by the app; no errors in the log except a WARN:
[Consumer clientId=consumer-1, groupId=foo] Connection to node 0 could not be established. Broker may not be available.
I then restarted the dead server; stopped my app; then added this code...
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
while(true) {
System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
Thread.sleep(3_000);
}
};
}
Again, killing the current leader had no impact; everything recovered ok.
You may need to tweak the listeners/advertised.listeners properties in your server props. Since my brokers are all on local host, I left them to default.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With