I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.
I use:
Spark Kafka DataSource has defined underlying schema:
|key|value|topic|partition|offset|timestamp|timestampType|
My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:
val columns = Array("column1", "column2") // column names
val rawKafkaDF = sparkSession.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic)
.load()
val columnsToSelect = columns.map( x => new Column("value." + x))
val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
// some analytics using stream dataframe kafkaDF
val query = kafkaDF.writeStream.format("console").start()
query.awaitTermination()
Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337;
because in time of creation of the stream, values inside are not known...
Do you have any suggestions?
Kafka supports both binary and text messages. The most popular text formats are plain/flat text, JSON, and Apache Avro.
readStream() . In R, with the read. stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
From the Spark perspective value
is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.
If data is serialized as a JSON string you have two options. You can cast
value
to StringType
and use from_json
and provide a schema:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
val schema: StructType = StructType(Seq(
StructField("column1", ???),
StructField("column2", ???)
))
rawKafkaDF.select(from_json($"value".cast(StringType), schema))
or cast
to StringType
, extract fields by path using get_json_object
:
import org.apache.spark.sql.functions.get_json_object
val columns: Seq[String] = ???
val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))
rawKafkaDF.select(exprs: _*)
and cast
later to the desired types.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With