I'm using AWS MSK and I want to enable ACLs but I'm unable to create a topic when ACLs are turned-on. I'm using the command-line tools for all the operations. Here's a summary of what I'm doing:
So the issue is that the topic is getting created on Zookeeper but the broker can't access it. Presumably due to some ACL rule that I'm missing.
Raw output of the commands that I've run:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
(kafka.admin.TopicCommand$)
Running the same command again:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
(kafka.admin.TopicCommand$)
List of topics via AdminClient:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list
List of topics via Zookeeper connect:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5
Here are my ACL rules:
Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
What am I missing?
Kafka manages and enforces ACLs through an authoriser. Kafka provides a default authoriser implementation SimpleAclAuthorizer that stores ACLs in Zookeeper. If no such configuration exists, then everyone is authorised to access any resource. We can also have our own implementation of the Authoriser.
To view Amazon MSK metrics in CloudWatchOpen the CloudWatch console at https://console.aws.amazon.com/cloudwatch/ . In the navigation pane, choose Metrics. Choose the All metrics tab, and then choose AWS/Kafka. To view broker-level metrics, choose Broker ID, Cluster Name.
Lambda now supports Amazon MSK as an event source, so it can consume messages and integrate with downstream serverless workflows. Apache Kafka is a distributed streaming platform that it is similar to Amazon Kinesis. Amazon MSK simplifies the setup, scaling, and management of clusters running Kafka.
I don't think this has anything to do with AWS MSK, and it is rather an issue with your Kafka secured cluster configuration. Both clients (subscribers/producers) and inter-broker actions require authorization in a secured cluster. You'd have the same issue in a non-managed Kafka cluster.
The recommendation is to set up a "superuser" user (I'd call them service accounts) on the servers and then give these "superuser" users ACLs that allow the inter-broker interactions you need for your cluster. The exact ACLs you need is going to vary depending on your use cases and security preferences.
In server.properties
you'd add an entry like super.users=User:BrokerService
, and is documented at
https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser. The documentation suggests using Alice and Bob as superuser names, which seems confusing to me. Pick whatever user name makes sense for you.
Then you need to setup a similar ACL that uses a user name principal with the "superuser" user you created above e.g. principal=User:BrokerService
. The ACL would give whatever permissions the brokers need. Your immediate use case is to ALLOW READ of all topics it sounds like. You'll probably need other ACLs for inter-broker communication as well, but I can't tell you what you need exactly without more information about what you want to do.
For example this command to setup the ACL.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster
More options for setting up the ACLs and a description of your exact problem are documented here https://docs.confluent.io/current/kafka/authorization.html#acl-format
Again please research some more or edit your question if you are looking for an exact configuration to use here as there are security and use case implications on what ACLs you use.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With