Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming with Kafka SASL/PLAIN authentication

Is there a way of connecting a Spark Structured Streaming Job to a Kafka cluster which is secured by SASL/PLAIN authentication?

I was thinking about something similar to:

val df2 = spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_PLAINTEXT")
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=...")
    .option("subscribe", "topic1")
    .load();

It seems like while Spark Structured Streaming recognizes the kafka.bootstrap.servers option, it does not recognize the other SASL-related options. Is there a different way?

like image 340
user152468 Avatar asked Apr 28 '20 13:04

user152468


People also ask

What is SASL authentication in Kafka?

SASL/PLAIN Overview. PLAIN, or SASL/PLAIN, is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Apache Kafka® supports a default implementation for SASL/PLAIN, which can be extended for production use.

What is the difference between Kafka and Spark streaming can Spark streaming do the same job as Kafka?

As Spark allows users to pull the data, hold it, process and push from source to target, it enables ETL process. However, Kafka does not offer exclusive ETL services. Instead, it relies on Kafka Connect API and the Kafka Streams API for the building of streaming data pipelines from source to destination.

What is spark Structured Streaming?

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. Using Structured Streaming, you can express your streaming computation the same way you would express a batch computation on static data. In this post, we will learn how to use Apache Spark and Spark Structured Streaming with Apache Kafka.

How to authenticate against Kafka cluster in Apache Spark?

Spark supports the following ways to authenticate against Kafka cluster: Delegation token (introduced in Kafka broker 1.1.0) This way the application can be configured via Spark parameters and may not need JAAS login configuration (Spark can use Kafka’s dynamic JAAS configuration feature).

What is structured streaming in Kafka?

Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed.

What happened to RDDs in Spark Streaming with Kafka?

RDDs are not the preferred abstraction layer anymore and the previous Spark Streaming with Kafka example utilized DStreams which was the Spark Streaming abstraction over streams of data at the time. Some of you might recall that DStreams was built on the foundation of RDDs.


1 Answers

Here is a full example in PySpark.

For test/dev you can inline the JAAS config in your options.

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";',
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "SASL_SSL",
    "kafka.bootstrap.servers": bootstrap_servers,
    "group.id": group_id,
    "subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()

If you use this mode in production you're going to want your JAAS config in a file. To do that copy the exact contents into a file called jaas.conf and remove the jaas key:

options = {
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol" : "SASL_SSL",
    "kafka.bootstrap.servers": bootstrap_servers,
    "group.id": group_id,
    "subscribe": topic,
}
df = spark.readStream.format("kafka").options(**options).load()

Then provide the file path to spark-submit. For example:

spark-submit \
  --driver-java-options -Djava.security.auth.login.config=/path/to/jaas.conf \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 yourapp.py

You will need to choose the right path and versions for your application.

like image 72
Carter Shanklin Avatar answered Oct 13 '22 21:10

Carter Shanklin