I have a Kafka10 cluster with SASL_SSL (Authentication( JAAS ) and Authorization) enabled. Able to connect thru SASL using the Java client with the below props.
ssl.keystore.location="client_keystore.jks"
ssl.keystore.password="password"
ssl.truststore.location="clienttruststore"
ssl.truststore.password="password"
and passing the JAAS conf file thru the JVM params.
-Djava.security.auth.login.config=/path/to/client_jaas.conf
Is there anyway to achieve the same thing with the python client?
Kafka uses SASL to perform authentication. It currently supports many mechanisms including PLAIN , SCRAM , OAUTH and GSSAPI and it allows administrator to plug custom implementations. Authentication can be enabled between brokers, between clients and brokers and between brokers and ZooKeeper.
PLAIN, or SASL/PLAIN, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Apache Kafka® supports a default implementation for SASL/PLAIN, which can be extended for production use.
Python client for the Apache Kafka distributed stream processing system. kafka-python is designed to function much like the official java client, with a sprinkling of pythonic interfaces (e.g., consumer iterators).
Python KafkaThere are a number of API libraries available that can let us publish messages to Kafka topics and consume messages from Kafka topics.
I've been connecting to IBM Message Hub which is kafka under the hood using code like this:
from kafka import KafkaProducer
from kafka.errors import KafkaError
import ssl
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'
# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
producer = KafkaProducer(bootstrap_servers = app.config['KAFKA_BROKERS_SASL'],
sasl_plain_username = app.config['KAFKA_USERNAME'],
sasl_plain_password = app.config['KAFKA_PASSWORD'],
security_protocol = security_protocol,
ssl_context = context,
sasl_mechanism = sasl_mechanism,
api_version = (0,10),
retries=5)
def send_message(message):
try:
producer.send(app.config['KAFKA_TOPIC'], message.encode('utf-8'))
except:
print("Unexpected error:", sys.exc_info()[0])
raise
Below are the configurations that worked for me for SASL_SSL using kafka-python client. I am using kafka-python 1.4.6 with kafka 2.2.0 on CentOS 6. These configurations can be used for PLAINTEXT and SSL security protocols along with SASL_SSL and SASL_PLAINTEXT.
Bash script to generate key files, CARoot, and self-signed cert for use with SSL:
#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:admin123
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
You can then use the following command to extract the CARoot.pem:
keytool -exportcert -alias CARoot -keystore server.keystore.jks -rfc -file CARoot.pem
In my server.properties file I have:
listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
security.protocol=SSL
sasl.enabled.mechanisms=PLAIN
ssl.truststore.location=/var/private/ssl/server.truststore.jks
ssl.truststore.password=admin123
ssl.keystore.location=/var/private/ssl/server.keystore.jks
ssl.keystore.password=admin123
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
advertised.listeners=PLAINTEXT://localhost:9091,SASL_PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094
In my JAAS configuration file(/etc/kafka/kafka_plain_jaas.conf):
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=kafka
password=kafka-secret
user_username=password;
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=username
password=password;
};
Before starting the Kafka server, need to run the following:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_plain_jaas.conf"
Python consumer and producer: The ssl_context and api_version are what caused SSL handshake errors to occur for me, leading to a timeout. So I commented those out. (There were some tutorials out there that mentioned to use those.)
from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
logging.basicConfig(level=logging.DEBUG)
try:
topic = "sendMessage"
sasl_mechanism = "PLAIN"
username = "username"
password = "password"
security_protocol = "SASL_SSL"
#context = ssl.create_default_context()
#context.options &= ssl.OP_NO_TLSv1
#context.options &= ssl.OP_NO_TLSv1_1
consumer = KafkaConsumer(topic, bootstrap_servers='localhost:9094',
#api_version=(0, 10),
security_protocol=security_protocol,
#ssl_context=context,
ssl_check_hostname=True,
ssl_cafile='../keys/CARoot.pem',
sasl_mechanism = sasl_mechanism,
sasl_plain_username = username,
sasl_plain_password = password)
#ssl_certfile='../keys/certificate.pem',
#ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))
producer = KafkaProducer(bootstrap_servers='localhost:9094',
#api_version=(0, 10),
security_protocol=security_protocol,
#ssl_context=context,
ssl_check_hostname=True,
ssl_cafile='../keys/CARoot.pem',
sasl_mechanism=sasl_mechanism,
sasl_plain_username=username,
sasl_plain_password=password)
#ssl_certfile='../keys/certificate.pem',
#ssl_keyfile='../keys/key.pem')#, api_version = (0,10))
# Write hello world to test topic
producer.send(topic, bytes("Hello World SSL"))
producer.flush()
for msg in consumer:
print(msg)
except Exception as e:
print e
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With