Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Integrating Spark Structured Streaming with the Confluent Schema Registry

Tags:

I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.

I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

like image 658
Souhaib Guitouni Avatar asked Feb 20 '18 10:02

Souhaib Guitouni


People also ask

What is the difference between spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.


1 Answers

It took me a couple months of reading source code and testing things out. In a nutshell, Spark can only handle String and Binary serialization. You must manually deserialize the data. In spark, create the confluent rest service object to get the schema. Convert the schema string in the response object into an Avro schema using the Avro parser. Next, read the Kafka topic as normal. Then map over the binary typed "value" column with the Confluent KafkaAvroDeSerializer. I strongly suggest getting into the source code for these classes because there is a lot going on here, so for brevity I'll leave out many details.

//Used Confluent version 3.2.2 to write this.  import io.confluent.kafka.schemaregistry.client.rest.RestService import io.confluent.kafka.serializers.KafkaAvroDeserializer import org.apache.avro.Schema  case class DeserializedFromKafkaRecord(key: String, value: String)  val schemaRegistryURL = "http://127.0.0.1:8081"  val topicName = "Schema-Registry-Example-topic1" val subjectValueName = topicName + "-value"  //create RestService object val restService = new RestService(schemaRegistryURL)  //.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object. val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)  //Use Avro parsing classes to get Avro Schema val parser = new Schema.Parser val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)  //key schema is typically just string but you can do the same process for the key as the value val keySchemaString = "\"string\"" val keySchema = parser.parse(keySchemaString)  //Create a map with the Schema registry url. //This is the only Required configuration for Confluent's KafkaAvroDeserializer. val props = Map("schema.registry.url" -> schemaRegistryURL)  //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception. var keyDeserializer: KafkaAvroDeserializer = null var valueDeserializer: KafkaAvroDeserializer = null  //Create structured streaming DF to read from the topic. val rawTopicMessageDF = sql.readStream   .format("kafka")   .option("kafka.bootstrap.servers", "127.0.0.1:9092")   .option("subscribe", topicName)   .option("startingOffsets", "earliest")   .option("maxOffsetsPerTrigger", 20)  //remove for prod   .load()  //instantiate the SerDe classes if not already, then deserialize! val deserializedTopicMessageDS = rawTopicMessageDF.map{   row =>     if (keyDeserializer == null) {       keyDeserializer = new KafkaAvroDeserializer       keyDeserializer.configure(props.asJava, true)  //isKey = true     }     if (valueDeserializer == null) {       valueDeserializer = new KafkaAvroDeserializer       valueDeserializer.configure(props.asJava, false) //isKey = false     }      //Pass the Avro schema.     val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?     val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString      DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString) }  val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream     .outputMode("append")     .format("console")     .option("truncate", false)     .start() 
like image 200
tstites Avatar answered Oct 22 '22 10:10

tstites