Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read streaming data in XML format from Kafka?

I am trying to read XML data from Kafka topic using Spark Structured streaming.

I tried using the Databricks spark-xml package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?

My current code:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

The error:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
like image 854
ranjith reddy Avatar asked Sep 01 '17 16:09

ranjith reddy


People also ask

Does Kafka support XML?

As Kafka supports any data format, XML is no problem at all. It accepts any serializable input format. XML is just text so that plain string serializers can be used.

How do I send XML data to Kafka topic?

XML integration with a 3rd party middleware. Kafka Connect connector for integration with XML files. Kafka Connect SMT to embed the transformation into ANY other Kafka Connect source or sink connector to transform XML messages on the flight.

How does Kafka stream data?

This quick start follows these steps: Start a Kafka cluster on a single machine. Write example input data to a Kafka topic, using the so-called console producer included in Apache Kafka. Process the input data with a Java application that uses the Kafka Streams library.


1 Answers

.format("kafka") \
.format('com.databricks.spark.xml') \

The last one with com.databricks.spark.xml wins and becomes the streaming source (hiding Kafka as the source).

In order words, the above is equivalent to .format('com.databricks.spark.xml') alone.

As you may have experienced, the Databricks spark-xml package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.

Is there any way I can extract XML data from Kafka topic using structured streaming?

You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.

That should not be a big deal anyway. A Scala code could look as follows.

val input = spark.
  readStream.
  format("kafka").
  ...
  load

val values = input.select('value cast "string")

val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))

// print XMLs and numbers to the stdout
val q = numbersFromXML.
  writeStream.
  format("console").
  start

Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame. That is supposed to work.

like image 182
Jacek Laskowski Avatar answered Oct 10 '22 03:10

Jacek Laskowski