Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Xml processing in Spark

Tags:

apache-spark

Scenario: My Input will be multiple small XMLs and am Supposed to read these XMLs as RDDs. Perform join with another dataset and form an RDD and send the output as an XML.

Is it possible to read XML using spark, load the data as RDD? If it is possible how will the XML be read.

Sample XML:

<root>
    <users>
        <user>
              <account>1234<\account>
              <name>name_1<\name>
              <number>34233<\number>
         <\user>
         <user>
              <account>58789<\account>
              <name>name_2<\name>
              <number>54697<\number>
         <\user>    
    <\users>
<\root>

How will this be loaded into the RDD?

like image 893
Pavani Avatar asked Oct 12 '15 09:10

Pavani


People also ask

Does spark support xml?

Apache Spark can also be used to process or read simple to complex nested XML files into Spark DataFrame and writing it back to XML using Databricks Spark XML API (spark-xml) library.

How does Pyspark read xml?

You can use Databricks jar to parse the xml to a dataframe. You can use maven or sbt to compile the dependency or you can directly use the jar with spark submit. Useful if you wanna use it in Pyspark instead of Scala.


4 Answers

Yes it possible but details will differ depending on an approach you take.

  • If files are small, as you've mentioned, the simplest solution is to load your data using SparkContext.wholeTextFiles. It loads data as RDD[(String, String)] where the the first element is path and the second file content. Then you parse each file individually like in a local mode.
  • For larger files you can use Hadoop input formats.
    • If structure is simple you can split records using textinputformat.record.delimiter. You can find a simple example here. Input is not a XML but it you should give you and idea how to proceed
    • Otherwise Mahout provides XmlInputFormat
  • Finally it is possible to read file using SparkContext.textFile and adjust later for record spanning between partitions. Conceptually it means something similar to creating sliding window or partitioning records into groups of fixed size:

    • use mapPartitionsWithIndex partitions to identify records broken between partitions, collect broken records
    • use second mapPartitionsWithIndex to repair broken records

Edit:

There is also relatively new spark-xml package which allows you to extract specific records by tag:

val df = sqlContext.read
  .format("com.databricks.spark.xml")
   .option("rowTag", "foo")
   .load("bar.xml")
like image 195
zero323 Avatar answered Oct 21 '22 08:10

zero323


Here's the way to perform it using HadoopInputFormats to read XML data in spark as explained by @zero323.

Input data:

<root>
    <users>
        <user>
            <account>1234<\account>
            <name>name_1<\name>
            <number>34233<\number>
        <\user>
        <user>
            <account>58789<\account>
            <name>name_2<\name>
            <number>54697<\number>
        <\user>
    <\users>
<\root>

Code for reading XML Input:

You will get some jars at this link

Imports:

//---------------spark_import
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

//----------------xml_loader_import
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{ LongWritable, Text }
import com.cloudera.datascience.common.XmlInputFormat

Code:

object Tester_loader {
  case class User(account: String, name: String, number: String)
  def main(args: Array[String]): Unit = {

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/"
    val sparkMasterUrl = "spark://SYSTEMX:7077"

    var jars = new Array[String](3)

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar"
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar"

    val conf = new SparkConf().setAppName("XML Reading")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .setSparkHome(sparkHome)
      .set("spark.executor.memory", "512m")
      .set("spark.default.deployCores", "12")
      .set("spark.cores.max", "12")
      .setJars(jars)

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // ---- loading user from XML

    // calling function 1.1
    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) 

    val xmlUserDF = pages.map { tuple =>
      {
        val account = extractField(tuple, "account")
        val name = extractField(tuple, "name")
        val number = extractField(tuple, "number")

        User(account, name, number)
      }
    }.toDF()
    println(xmlUserDF.count())
    xmlUserDF.show()
  }

Functions:

  def readFile(path: String, start_tag: String, end_tag: String,
      sc: SparkContext) = {

    val conf = new Configuration()
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag)
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag)
    val rawXmls = sc.newAPIHadoopFile(
        path, classOf[XmlInputFormat], classOf[LongWritable],
        classOf[Text], conf)

    rawXmls.map(p => p._2.toString)
  }

  def extractField(tuple: String, tag: String) = {
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</")

    if (value.contains("<" + tag + ">") &&
        value.contains("</" + tag + ">")) {
      value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0)
    }
    value
  }

}

Output:

+-------+------+------+
|account|  name|number|
+-------+------+------+
|   1234|name_1| 34233|
|  58789|name_2| 54697|
+-------+------+------+

The result obtained is in dataframes you can convert them to RDD as per your requirement like this->

val xmlUserRDD = xmlUserDF.toJavaRDD.rdd.map { x =>
    (x.get(0).toString(),x.get(1).toString(),x.get(2).toString()) }

Please evaluate it, if it could help you some how.

like image 25
Kshitij Kulshrestha Avatar answered Oct 21 '22 09:10

Kshitij Kulshrestha


This will help you.

package packagename;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import com.databricks.spark.xml.XmlReader;

public class XmlreaderSpark {
    public static void main(String arr[]){
    String localxml="file path";
    String booksFileTag = "user";

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
   System.out.println("warehouseLocation" + warehouseLocation);
    SparkSession spark = SparkSession
              .builder()
              .master("local")
              .appName("Java Spark SQL Example")
              .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation)
              .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true")
              .getOrCreate();
    SQLContext sqlContext = new SQLContext(spark);

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml);
    df.show();

    }
}

You need to add this dependency in your POM.xml:

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-xml_2.10</artifactId>
   <version>0.4.0</version>
</dependency>

and your input file is not in proper format.

Thanks.

like image 20
ROOT Avatar answered Oct 21 '22 10:10

ROOT


There are two good options for simple cases:

  • wholeTextFiles. Use map method with your XML parser which could be Scala XML pull parser (quicker to code) or the SAX Pull Parser (better performance).
  • Hadoop streaming XMLInputFormat which you must define the start and end tag <user> </user> to process it, however, it creates one partition per user tag
  • spark-xml package is a good option too.

With all options you are limited to only process simple XMLs which can be interpreted as dataset with rows and columns.

However, if we make it a little complex, those options won’t be useful.

For example, if you have one more entity there:

<root>
    <users>
    <user>...</users>
    <companies>
    <company>...</companies>
</root>

Now you need to generate 2 RDDs and change your parser to recognise the <company> tag.

This is just a simple case, but the XML could be much more complex and you would need to include more and more changes.

To solve this complexity we’ve built Flexter on top of Apache Spark to take the pain out of processing XML files on Spark. I also recommend to read about converting XML on Spark to Parquet. The latter post also includes some code samples that show how the output can be queried with SparkSQL.

Disclaimer: I work for Sonra

like image 41
Uli Bethke Avatar answered Oct 21 '22 08:10

Uli Bethke