I am trying to read XML data from Kafka topic using Spark Structured streaming.
I tried using the Databricks spark-xml
package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?
My current code:
df = spark \
.readStream \
.format("kafka") \
.format('com.databricks.spark.xml') \
.options(rowTag="MainElement")\
.option("kafka.bootstrap.servers", "localhost:9092") \
.option(subscribeType, "test") \
.load()
The error:
py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
As Kafka supports any data format, XML is no problem at all. It accepts any serializable input format. XML is just text so that plain string serializers can be used.
XML integration with a 3rd party middleware. Kafka Connect connector for integration with XML files. Kafka Connect SMT to embed the transformation into ANY other Kafka Connect source or sink connector to transform XML messages on the flight.
This quick start follows these steps: Start a Kafka cluster on a single machine. Write example input data to a Kafka topic, using the so-called console producer included in Apache Kafka. Process the input data with a Java application that uses the Kafka Streams library.
.format("kafka") \ .format('com.databricks.spark.xml') \
The last one with com.databricks.spark.xml
wins and becomes the streaming source (hiding Kafka as the source).
In order words, the above is equivalent to .format('com.databricks.spark.xml')
alone.
As you may have experienced, the Databricks spark-xml
package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.
Is there any way I can extract XML data from Kafka topic using structured streaming?
You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.
That should not be a big deal anyway. A Scala code could look as follows.
val input = spark.
readStream.
format("kafka").
...
load
val values = input.select('value cast "string")
val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
// print XMLs and numbers to the stdout
val q = numbersFromXML.
writeStream.
format("console").
start
Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame
. That is supposed to work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With