I am trying to use the Structured Streaming API to connect to a Kerberos-secured Kafka cluster. Below is my code and the output from Spark. I don't see any exceptions, just WARN messages that the client disconnects. What would be a next step to troubleshoot this?
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Logger, Level}
object Main {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local[*]")
.appName("myapp")
.config("spark.executor.extraJavaOptions", "java.security.auth.login.config=jaas.conf")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
.option("security.protocol", "SASL_PLAINTEXT")
.option("sasl.kerberos.service.name", "mysvcname")
.option("subscribe", "mytopic")
.load()
val query = lines.select("value").writeStream.format("console").start()
query.awaitTermination()
}
Here's the output:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/02/11 17:15:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/02/11 17:15:10 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker1:9100 (id: -1 rack: null) disconnected
19/02/11 17:15:11 WARN NetworkClient: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cef02569-ab16-4ca2-a9e8-18bcea992c0d--1359730493-driver-0] Bootstrap broker broker2:9100 (id: -2 rack: null) disconnected
...
I figured out my problem. When specifying the security protocol option, the option name must be prefixed with "kafka.". This is confusing because for a regular Kafka consumer the option is simply security.protocol but for the purpose of configuring Spark both bootstrap.servers and security.protocol (and presumably any other options/properties you may need) must be prefixed with kafka.. My original code was:
.option("security.protocol", "SASL_PLAINTEXT")
the correct option is:
.option("kafka.security.protocol", "SASL_PLAINTEXT")
Here's the complete code that works:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
object Main {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.INFO)
Logger.getLogger("akka").setLevel(Level.INFO)
val spark = SparkSession.builder()
.master("local[*]")
.appName("myapp")
.config("spark.executor.extraJavaOptions", "java.security.auth.login.config=c:/krb/jaas.conf")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("subscribe", "mytopic")
.load()
val query = lines.select("value").writeStream.format("console").start()
query.awaitTermination()
}
}
For reference, here's the content of the jaas.conf file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="c:/krb/mykeytab.keytab"
principal="[email protected]"
storeKey=true
useTicketCache=false
serviceName="myservicename";
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With