Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

reading a nested JSON file in pyspark

Tags:

json

pyspark

I'd like to create a pyspark dataframe from a json file in hdfs.

the json file has the following contet:

{ "Product": { "0": "Desktop Computer", "1": "Tablet", "2": "iPhone", "3": "Laptop" }, "Price": { "0": 700, "1": 250, "2": 800, "3": 1200 } }

Then, I read this file using pyspark 2.4.4 df = spark.read.json("/path/file.json")

So, I get a result like this:

df.show(truncate=False)
+---------------------+---------------------------------+
|Price                |Product                          |
+---------------------+---------------------------------+
|[700, 250, 800, 1200]|[Desktop, Tablet, Iphone, Laptop]|
+---------------------+---------------------------------+

But I'd like a dataframe with the following structure:

+-------+--------+
|Price  |Product |
+-------+--------+
|700    |Desktop | 
|250    |Tablet  |
|800    |Iphone  |
|1200   |Laptop  |
+-------+--------+

How can I get a dataframe with the prevvious structure using pyspark?

I tried to use explode df.select(explode("Price")) but I got the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o688.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;
'Project [explode(Price#107) AS List()]
+- LogicalRDD [Price#107, Product#108], false

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:97)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1312)
    at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-46-463397adf153> in <module>
----> 1 df.select(explode("Price"))

/usr/lib/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
   1200         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1201         """
-> 1202         jdf = self._jdf.select(self._jcols(*cols))
   1203         return DataFrame(jdf, self.sql_ctx)
   1204 

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;\n'Project [explode(Price#107) AS List()]\n+- LogicalRDD [Price#107, Product#108], false\n"
like image 660
Master_RDA Avatar asked Sep 05 '19 18:09

Master_RDA


2 Answers

Recreating your DataFrame:

from pyspark.sql import functions as F

df = spark.read.json("./row.json") 
df.printSchema()
#root
# |-- Price: struct (nullable = true)
# |    |-- 0: long (nullable = true)
# |    |-- 1: long (nullable = true)
# |    |-- 2: long (nullable = true)
# |    |-- 3: long (nullable = true)
# |-- Product: struct (nullable = true)
# |    |-- 0: string (nullable = true)
# |    |-- 1: string (nullable = true)
# |    |-- 2: string (nullable = true)
# |    |-- 3: string (nullable = true)

As shown above in the printSchema output, your Price and Product columns are structs. Thus explode will not work since it requires an ArrayType or MapType.

First, convert the structs to arrays using the .* notation as shown in Querying Spark SQL DataFrame with complex types:

df = df.select(
    F.array(F.expr("Price.*")).alias("Price"),
    F.array(F.expr("Product.*")).alias("Product")
)

df.printSchema()

#root
# |-- Price: array (nullable = false)
# |    |-- element: long (containsNull = true)
# |-- Product: array (nullable = false)
# |    |-- element: string (containsNull = true)

Now since you're using Spark 2.4+, you can use arrays_zip to zip the Price and Product arrays together, before using explode:

df.withColumn("price_product", F.explode(F.arrays_zip("Price", "Product")))\
    .select("price_product.Price", "price_product.Product")\
    .show()

#+-----+----------------+
#|Price|         Product|
#+-----+----------------+
#|  700|Desktop Computer|
#|  250|          Tablet|
#|  800|          iPhone|
#| 1200|          Laptop|
#+-----+----------------+

For older versions of Spark, before arrays_zip, you can explode each column separately and join the results back together:

df1 = df\
.withColumn("price_map", F.explode("Price"))\
.withColumn("id", F.monotonically_increasing_id())\
.drop("Price", "Product")

df2 = df\
.withColumn("product_map", F.explode("Product"))\
.withColumn("id", F.monotonically_increasing_id())\
.drop("Price", "Product")

df3 = df1.join(df2, "id", "outer").drop("id")

df3.show()

#+---------+----------------+
#|price_map|     product_map|
#+---------+----------------+
#|      700|Desktop Computer|
#|      250|          Tablet|
#|     1200|          Laptop|
#|      800|          iPhone|
#+---------+----------------+
like image 157
thePurplePython Avatar answered Oct 20 '22 14:10

thePurplePython


For Spark version without array_zip, we can also do this:

  1. First read the json file into a DataFrame

from pyspark.sql import functions as F


df=spark.read.json("your_json_file.json")
df.show(truncate=False)

+---------------------+------------------------------------------+
|Price                |Product                                   |
+---------------------+------------------------------------------+
|[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|
+---------------------+------------------------------------------+

Next, expand the struct into array:

df = df.withColumn('prc_array', F.array(F.expr('Price.*')))
df = df.withColumn('prod_array', F.array(F.expr('Product.*')))

Then create a map between the two arrays

df = df.withColumn('prc_prod_map', F.map_from_arrays('prc_array', 'prod_array'))
df.select('prc_array', 'prod_array', 'prc_prod_map').show(truncate=False)


+---------------------+------------------------------------------+-----------------------------------------------------------------------+
|prc_array            |prod_array                                |prc_prod_map                                                           |
+---------------------+------------------------------------------+-----------------------------------------------------------------------+
|[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|[700 -> Desktop Computer, 250 -> Tablet, 800 -> iPhone, 1200 -> Laptop]|
+---------------------+------------------------------------------+-----------------------------------------------------------------------+

Finally, apply explode on the map:

df = df.select(F.explode('prc_prod_map').alias('prc', 'prod'))
df.show(truncate=False)

+----+----------------+
|prc |prod            |
+----+----------------+
|700 |Desktop Computer|
|250 |Tablet          |
|800 |iPhone          |
|1200|Laptop          |
+----+----------------+

This way, we avoid the potentially time consuming join operation on two tables.

like image 38
niuer Avatar answered Oct 20 '22 13:10

niuer