Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS MSK - Timeout when creating Kafka topic with ACL turned-on

I'm using AWS MSK and I want to enable ACLs but I'm unable to create a topic when ACLs are turned-on. I'm using the command-line tools for all the operations. Here's a summary of what I'm doing:

  • Create a fresh cluster
  • Create a topic - this works fine
  • Turn on ACL for client1 on resource=CLUSTER and operation=ALL
  • Create topic using AdminClient (by providing the --bootstrap-server option) - this gives a timeout exception
  • Re-try creating the same topic - this gives an error saying topic already exists
  • List topics using AdminClient - this returns no topics
  • Create topic using Zookeeper connect - this works
  • List topics using Zookeeper connect - this returns all the topic I've created (even those that timed-out)

So the issue is that the topic is getting created on Zookeeper but the broker can't access it. Presumably due to some ACL rule that I'm missing.

Raw output of the commands that I've run:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
 (kafka.admin.TopicCommand$)

Running the same command again:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1

Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
        at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
        at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
        at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
 (kafka.admin.TopicCommand$)

List of topics via AdminClient:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list


List of topics via Zookeeper connect:

ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5

Here are my ACL rules:

Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
        (principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)

What am I missing?

like image 648
Araf Avatar asked Sep 30 '19 18:09

Araf


People also ask

How does Kafka ACL work?

Kafka manages and enforces ACLs through an authoriser. Kafka provides a default authoriser implementation SimpleAclAuthorizer that stores ACLs in Zookeeper. If no such configuration exists, then everyone is authorised to access any resource. We can also have our own implementation of the Authoriser.

How do I check my AWS MSK broker status?

To view Amazon MSK metrics in CloudWatchOpen the CloudWatch console at https://console.aws.amazon.com/cloudwatch/ . In the navigation pane, choose Metrics. Choose the All metrics tab, and then choose AWS/Kafka. To view broker-level metrics, choose Broker ID, Cluster Name.

Can Lambda write to MSK?

Lambda now supports Amazon MSK as an event source, so it can consume messages and integrate with downstream serverless workflows. Apache Kafka is a distributed streaming platform that it is similar to Amazon Kinesis. Amazon MSK simplifies the setup, scaling, and management of clusters running Kafka.


1 Answers

I don't think this has anything to do with AWS MSK, and it is rather an issue with your Kafka secured cluster configuration. Both clients (subscribers/producers) and inter-broker actions require authorization in a secured cluster. You'd have the same issue in a non-managed Kafka cluster.

The recommendation is to set up a "superuser" user (I'd call them service accounts) on the servers and then give these "superuser" users ACLs that allow the inter-broker interactions you need for your cluster. The exact ACLs you need is going to vary depending on your use cases and security preferences.

In server.properties you'd add an entry like super.users=User:BrokerService, and is documented at https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser. The documentation suggests using Alice and Bob as superuser names, which seems confusing to me. Pick whatever user name makes sense for you.

Then you need to setup a similar ACL that uses a user name principal with the "superuser" user you created above e.g. principal=User:BrokerService. The ACL would give whatever permissions the brokers need. Your immediate use case is to ALLOW READ of all topics it sounds like. You'll probably need other ACLs for inter-broker communication as well, but I can't tell you what you need exactly without more information about what you want to do.

For example this command to setup the ACL.

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster

More options for setting up the ACLs and a description of your exact problem are documented here https://docs.confluent.io/current/kafka/authorization.html#acl-format

Again please research some more or edit your question if you are looking for an exact configuration to use here as there are security and use case implications on what ACLs you use.

like image 194
Dude0001 Avatar answered Oct 20 '22 16:10

Dude0001