Use Case:
I am using Spring Boot 2.2.5.RELEASE and Kafka 2.4.1
JAAS/SASL configurations are done properly on Kafka/ZooKeeper as topics are created without issue with kafka-topics.bat
Issue:
When i start Spring Boot application, i immediately get the following errors:
kafka-server-start.bat console:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
IDE console:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected
My application.properties configuration:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
kafka_server_jaas.conf:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="12345"
user_admin="12345"
user_spring_bO0t="i_am_a_spring_bO0t_user";
};
Am i missing something?
Thanks in advance.
I defined the properties in the wrong place i.e in application.properties. As i have ProducerFactory & ConsumerFactory beans, those application.properties will be ignored by Spring Boot.
Configuring the same properties in the beans definitions resolved the issue, i.e move your properties from application.properties to where you define your beans.
Here's an example:
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username" +
"password=password;");
return new KafkaAdmin(configs);
}
The answer provided by @jumping_monkey is correct, however I didn't know where to put those configurations in ProducerFactory & ConsumerFactory beans, so I'll leave an example below for those who want to know:
-In your ProducerConfig or ConsumerConfig Beans respectively (Mine is named generalMessageProducerFactory):
@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put("sasl.mechanism", "PLAIN");
configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configProps.put("security.protocol", "SASL_SSL");
return new DefaultKafkaProducerFactory<>(configProps);
}
And also in your TopicConfiguration Class in kafkaAdmin method:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configs.put("security.protocol", "SASL_SSL");
return new KafkaAdmin(configs);
}
Hope this was helpful guys!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With