Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark for Json Data

I am processing a nested complex Json and below is the schema for it.

root
 |-- businessEntity: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- payGroup: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- reportingPeriod: struct (nullable = true)
 |    |    |    |    |    |-- worker: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- category: string (nullable = true)
 |    |    |    |    |    |    |    |-- person: struct (nullable = true)
 |    |    |    |    |    |    |    |-- tax: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- qtdAmount: double (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- ytdAmount: double (nullable = 

My requirement is to create a hashmap with code concatenated with qtdAmount as key and value of qtdAmount as value. Map.put(code + "qtdAmount" , qtdAmount). How can i do this with spark.

I tried with below shell commands.

import org.apache.spark.sql._
val sqlcontext = new SQLContext(sc)
val cdm = sqlcontext.read.json("/user/edureka/CDM/cdm.json")
val spark = SparkSession.builder().appName("SQL").config("spark.some.config.option","some-vale").getOrCreate()
cdm.createOrReplaceTempView("CDM")
val sqlDF = spark.sql("SELECT businessEntity[0].payGroup[0] from CDM").show()
val address = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].person.address from CDM as address")
val worker = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker from CDM")
val tax = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val tax = sqlcontext.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
tax.select("tax.code")


val codes = tax.select(expode(tax("code"))
scala> val codes = tax.withColumn("code",explode(tax("tax.code"))).withColumn("qtdAmount",explode(tax("tax.qtdAmount"))).withColumn("ytdAmount",explode(tax("tax.ytdAmount")))

i am trying to get all the codes and qtdAmount into a map. But i am not getting it. Using multiple explode statements for a single DF, is producing cartesian product of the elements.

Could someone please help on how to parse the json of this much complex in spark.

like image 282
Sivaram Kappaganthu Avatar asked Apr 24 '26 14:04

Sivaram Kappaganthu


1 Answers

You can get code and qtyAmount in this way.

import sqlcontext.implicits._

     cdm.select(
        $"businessEntity.element.payGroup.element.reportingPeriod.worker.element.tax.element.code".as("code"),
        $"businessEntity.element.payGroup.element.reportingPeriod.worker.element.tax.element.qtdAmount".as("qtdAmount")
      ).show

For detailed information, check this

like image 191
Shankar Avatar answered Apr 27 '26 09:04

Shankar