I'm actually working on setting up simple Kafka authentication using SASL Plain Text and add ACL authorization. But I have an issue when I try to consume data.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 2 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 4 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 5 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 6 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 7 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 8 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 9 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 10 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
Next, you can see my configuration files.
server.properties
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none
consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-secret";
};
Environment variable:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"
Commands
Set ACL:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation All --group test-consumer-group --topic test-topic
start Kafka Server :
./bin/kafka-server-start.sh config/server.properties
Start Producer:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties
Start Consumer:
bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties --bootstrap-server=localhost:9092
When I try to start the consumer, I have the issue described above. Also, in the kafka logs, I have this:
[2016-10-22 20:17:14,091] ERROR [KafkaApi-0] Error when handling request {group_id=test-consumer-group} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651)
at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
at scala.Option.getOrElse(Option.scala:121)
at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657)
at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818)
at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
How can I fix this?
Issue fixed by separating jaas client and jaas server.
kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-secret";
};
On the same terminal, export jaas server conf file and start kafka broker:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"
$ ./bin/kafka-server-start.sh config/server.properties
On a client terminal, export client jaas conf file and start consumer:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties --bootstrap-server=localhost:9092
If you also want to produce, do this on another terminal window:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties
I have faced similar issue with using the ACLs in Kafka v.0.10. I found this discussion helpful. Especially enabling the authorization log in order to check what is the incoming username for the request and what is it specified in your ACLs.
Firstly check if the server principal admin is provided all the authorization needed. Server principal needs to be allowed to perform all types of authorization on all topics, groups as well as cluster. It's better to declare the admin in the super-users in server.properties file. If this doesn't resolve the issue, then you can enable the authorization log to find out which specimen is being deined for what operation.
Authorization log can be enabled by modifying the log4j.properties in the config folder. In log4j.properties file, change WARN
to DEBUG
and restart the kafka-servers.
log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
This helped me in sorting out my issue. Hope that helps.
PS: The authorization logs generated will be very lengthy and consume a lot of space. So, remember to turn this off when done with debugging.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With