Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka 10 - Python Client with Authentication and Authorization

I have a Kafka10 cluster with SASL_SSL (Authentication( JAAS ) and Authorization) enabled. Able to connect thru SASL using the Java client with the below props.

ssl.keystore.location="client_keystore.jks"
ssl.keystore.password="password"
ssl.truststore.location="clienttruststore"
ssl.truststore.password="password" 

and passing the JAAS conf file thru the JVM params.

-Djava.security.auth.login.config=/path/to/client_jaas.conf

Is there anyway to achieve the same thing with the python client?

like image 465
user1578872 Avatar asked Mar 23 '17 21:03

user1578872


People also ask

Does Kafka support authentication?

Kafka uses SASL to perform authentication. It currently supports many mechanisms including PLAIN , SCRAM , OAUTH and GSSAPI and it allows administrator to plug custom implementations. Authentication can be enabled between brokers, between clients and brokers and between brokers and ZooKeeper.

What is SASL authentication in Kafka?

PLAIN, or SASL/PLAIN, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Apache Kafka® supports a default implementation for SASL/PLAIN, which can be extended for production use.

Can you use Apache Kafka with python?

Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).

Does Kafka have python API?

Python KafkaThere are a number of API libraries available that can let us publish messages to Kafka topics and consume messages from Kafka topics.


2 Answers

I've been connecting to IBM Message Hub which is kafka under the hood using code like this:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl

sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'

# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1

producer = KafkaProducer(bootstrap_servers = app.config['KAFKA_BROKERS_SASL'],
                         sasl_plain_username = app.config['KAFKA_USERNAME'],
                         sasl_plain_password = app.config['KAFKA_PASSWORD'],
                         security_protocol = security_protocol,
                         ssl_context = context,
                         sasl_mechanism = sasl_mechanism,
                         api_version = (0,10),
                         retries=5)

def send_message(message):

    try:
        producer.send(app.config['KAFKA_TOPIC'], message.encode('utf-8'))
    except:
        print("Unexpected error:", sys.exc_info()[0])
        raise
like image 197
Chris Snow Avatar answered Sep 28 '22 06:09

Chris Snow


Below are the configurations that worked for me for SASL_SSL using kafka-python client. I am using kafka-python 1.4.6 with kafka 2.2.0 on CentOS 6. These configurations can be used for PLAINTEXT and SSL security protocols along with SASL_SSL and SASL_PLAINTEXT.

Bash script to generate key files, CARoot, and self-signed cert for use with SSL:

#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:admin123
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

You can then use the following command to extract the CARoot.pem:

keytool -exportcert -alias CARoot -keystore server.keystore.jks -rfc -file CARoot.pem

In my server.properties file I have:

listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
security.protocol=SSL
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=admin123
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=admin123
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
advertised.listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094

In my JAAS configuration file(/etc/kafka/kafka_plain_jaas.conf):

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
   username=kafka
   password=kafka-secret
   user_username=password;
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
  username=username
  password=password;
};

Before starting the Kafka server, need to run the following:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_plain_jaas.conf"

Python consumer and producer: The ssl_context and api_version are what caused SSL handshake errors to occur for me, leading to a timeout. So I commented those out. (There were some tutorials out there that mentioned to use those.)

from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
logging.basicConfig(level=logging.DEBUG)

try:
    topic = "sendMessage"
    sasl_mechanism = "PLAIN"
    username = "username"
    password = "password"
    security_protocol = "SASL_SSL"

    #context = ssl.create_default_context()
    #context.options &= ssl.OP_NO_TLSv1
    #context.options &= ssl.OP_NO_TLSv1_1

    consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9094',
                              #api_version=(0, 10),
                              security_protocol=security_protocol,
                              #ssl_context=context,
                              ssl_check_hostname=True,
                              ssl_cafile='../keys/CARoot.pem',
                              sasl_mechanism = sasl_mechanism,
                              sasl_plain_username = username,
                              sasl_plain_password = password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))

    producer = KafkaProducer(bootstrap_servers='localhost:9094',
                             #api_version=(0, 10),
                             security_protocol=security_protocol,
                             #ssl_context=context,
                             ssl_check_hostname=True,
                             ssl_cafile='../keys/CARoot.pem',
                             sasl_mechanism=sasl_mechanism,
                             sasl_plain_username=username,
                             sasl_plain_password=password)
                              #ssl_certfile='../keys/certificate.pem',
                              #ssl_keyfile='../keys/key.pem')#, api_version = (0,10))
    # Write hello world to test topic
    producer.send(topic, bytes("Hello World SSL"))
    producer.flush()


    for msg in consumer:
        print(msg)


except Exception as e:
    print e
like image 33
Mohammad Anwar Avatar answered Sep 28 '22 08:09

Mohammad Anwar