Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connect Python to MSK with IAM role-based authentication

I've written a python script with aiokafka to produce and consume from a Kafka cluster in AWS MSK, I'm running the script from a EC2 instance that is in the same VPC as my cluster and when I try to connect my script to a cluster it refuse to accept the connection:

The script

from aiokafka import AIOKafkaConsumer
import asyncio
import os
import sys


async def consume():
    bootstrap_server = os.environ.get('BOOTSTRAP_SERVER', 'localhost:9092')
    topic = os.environ.get('TOPIC', 'demo')
    group = os.environ.get('GROUP_ID', 'demo-group')
    consumer = AIOKafkaConsumer(
        topic, bootstrap_servers=bootstrap_server, group_id=group
    )

    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()


def main():
    try:
        asyncio.run(consume())
    except KeyboardInterrupt:
        print("Bye!")
        sys.exit(0)


if __name__ == "__main__":
    print("Welcome to Kafka test script. ctrl + c to exit")
    main()

The exception

Unable to request metadata from "boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098": KafkaConnectionError: Connection at boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 closed
Traceback (most recent call last):
  File "producer.py", line 33, in <module>
    main()
  File "producer.py", line 25, in main
    asyncio.run(produce_message(message))
  File "/usr/lib64/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/lib64/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "producer.py", line 12, in produce_message
    await producer.start()
  File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/producer/producer.py", line 296, in start
    await self.client.bootstrap()
  File "/home/ec2-user/py-kafka-test/pykafka/lib64/python3.7/site-packages/aiokafka/client.py", line 250, in bootstrap
    f'Unable to bootstrap from {self.hosts}')
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('boot-zm5x2eaw.c3.kafka-serverless.us-east-1.amazonaws.com', 9098, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f76d123a510>

I've already tested the connection with the kafka shell scripts and it worked fine:

./kafka-console-producer.sh --bootstrap-server boot-xxxxxxx.cx.kafka-serverless.us-xxxx-1.amazonaws.com:9098 --producer.config client.properties  --topic myTopic

But whenever I try with python it just don't work, I've investigated a little and found that it might be the authentication protocol, my KMS Cluster is protected with IAM role-based authentication but no matter how much I search there is no documentation on how to authenticate with IAM in the python kafka libraries: aiokafka, python-kafka, faust, etc.

Does anyone have an example on how to successfully connect to a KMS serverless cluster with IAM role-based authentication using Python?

like image 834
Israel Calderón Avatar asked Jun 01 '26 03:06

Israel Calderón


1 Answers

AWS officially released aws-msk-iam-sasl-signer-python. Here an example code using aiokafka and aws-msk-iam-sasl-signer-python

aiokafka==0.10.0
aws-msk-iam-sasl-signer-python==1.0.1
import asyncio
import os, sys
from aiokafka import AIOKafkaConsumer
from aiokafka.abc import AbstractTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import ssl

def create_ssl_context():
    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.check_hostname = False
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.load_default_certs()

    return _ssl_context

class AWSTokenProvider(AbstractTokenProvider):
    async def token(self):
        return await asyncio.get_running_loop().run_in_executor(None, self._token)

    def _token(self):
        AWS_REGION = os.getenv('AWS_REGION')
        token, _ = MSKAuthTokenProvider.generate_auth_token(AWS_REGION)
        return token

async def consume():
    KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'demo')
    KAFKA_GROUP_ID = os.getenv('KAFKA_GROUP_ID', 'demo-group')
    KAFKA_SERVER = os.getenv('KAFKA_SERVER', 'b-1.mykafka.abcdef.c5.kafka.us-west-2.amazonaws.com:9098')

    consumer = AIOKafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_SERVER,
        group_id=KAFKA_GROUP_ID,
        security_protocol='SASL_SSL',
        ssl_context=create_ssl_context(),
        sasl_mechanism="OAUTHBEARER",
        sasl_oauth_token_provider=AWSTokenProvider()
    )

    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()


def main():
    try:
        asyncio.run(consume())
    except KeyboardInterrupt:
        print("Bye!")
        sys.exit(0)


if __name__ == "__main__":
    main()
like image 159
Zuerst Avatar answered Jun 02 '26 20:06

Zuerst