Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark union fails with nested JSON dataframe

I have the following two JSON files:

{
    "name" : "Agent1",
    "age" : "32",
    "details" : [{
            "d1" : 1,
            "d2" : 2
        }
    ]
}

{
    "name" : "Agent2",
    "age" : "42",
    "details" : []
}

I read them with spark:

val jsonDf1 = spark.read.json(pathToJson1)
val jsonDf2 = spark.read.json(pathToJson2)

two dataframes are created with the following schemas:

root
 |-- age: string (nullable = true)
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- d1: long (nullable = true)
 |    |    |-- d2: long (nullable = true)
 |-- name: string (nullable = true)

root
|-- age: string (nullable = true)
|-- details: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- name: string (nullable = true)

When I try to perform a union with these two dataframes I get this error:

jsonDf1.union(jsonDf2)


org.apache.spark.sql.AnalysisException: unresolved operator 'Union;;
'Union
:- LogicalRDD [age#0, details#1, name#2]
+- LogicalRDD [age#7, details#8, name#9]

How can I resolve this? I will get empty arrays sometimes in the JSON files the spark job will load, but it will still have to unify them, which shouldn't be a problem since the schema of the Json files is the same.

like image 457
morm Avatar asked Mar 01 '17 11:03

morm


People also ask

Can Spark read nested JSON?

In our Read JSON file in Spark post, we have read a simple JSON file into a Spark Dataframe. In this post, we are moving to handle an advanced JSON data type. We will read nested JSON in spark Dataframe.

How do you read the JSON file and create a DataFrame with the JSON data?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using the read. json() function, which loads data from a directory of JSON files where each line of the files is a JSON object.

How do you perform a union of multiple DataFrames in PySpark?

Method 2: UnionByName() function in pyspark The PySpark unionByName() function is also used to combine two or more data frames but it might be used to combine dataframes having different schema. This is because it combines data frames by the name of the column and not the order of the columns.


2 Answers

If you try to union the 2 dataframes you will get this :

error:org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. ArrayType(StringType,true) <> ArrayType(StructType(StructField(d1,StringType,true), StructField(d2,StringType,true)),true) at the second column of the second table

Json files arrive at the same time

To solve this problem, if you can read the JSON at the same time, I would suggest :

val jsonDf1 = spark.read.json("json1.json", "json2.json")

This will give this schema:

jsonDf1.printSchema
 |-- age: string (nullable = true)
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- d1: long (nullable = true)
 |    |    |-- d2: long (nullable = true)
 |-- name: string (nullable = true)

The data output

jsonDf1.show(10,truncate = false)
+---+-------+------+
|age|details|name  |
+---+-------+------+
|32 |[[1,2]]|Agent1|
|42 |null   |Agent2|
+---+-------+------+

Json files arrive at different times

If your json arrive at different times, as a default solution, I would recommend to read a template JSON object with a full array, that will make your dataframe with a possible empty array valid for any union. Then, you will remove with a filter this fake JSON before outputting the result:

val df = spark.read.json("jsonWithMaybeAnEmptyArray.json", 
"TemplateFakeJsonWithAFullArray.json")

df.filter($"name" !== "FakeAgent").show(1)

Please note : A Jira card has been opened to improve capability to merge SQL data types: https://issues.apache.org/jira/browse/SPARK-19536 and this kind of operation should be possible in the next Spark version.

like image 132
Paul Leclercq Avatar answered Oct 03 '22 08:10

Paul Leclercq


polomarcus's answer led me to this solution: I couldn't read all the files at once because I got a list of files as input, and spark didn't have an API that receives a list of paths, but apparently with Scala it's possible to do this:

val files = List("path1", "path2", "path3")
val dataframe = spark.read.json(files: _*)

This way I got one dataframe containing all three files.

like image 33
morm Avatar answered Oct 03 '22 09:10

morm