Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Explode (transpose?) multiple columns in Spark SQL table

I am using Spark SQL (I mention that it is in Spark in case that affects the SQL syntax - I'm not familiar enough to be sure yet) and I have a table that I am trying to re-structure, but I'm getting stuck trying to transpose multiple columns at the same time.

Basically I have data that looks like:

userId    someString      varA     varB    1      "example1"    [0,2,5]   [1,2,9]    2      "example2"    [1,20,5]  [9,null,6] 

and I'd like to explode both varA and varB simultaneously (the length will always be consistent) - so that the final output looks like this:

userId    someString      varA     varB    1      "example1"       0         1    1      "example1"       2         2    1      "example1"       5         9    2      "example2"       1         9    2      "example2"       20       null    2      "example2"       5         6 

but I can only seem to get a single explode(var) statement to work in one command, and if I try to chain them (ie create a temp table after the first explode command) then I obviously get a huge number of duplicate, unnecessary rows.

Many thanks!

like image 353
anthr Avatar asked Oct 19 '15 17:10

anthr


People also ask

What is explode in spark SQL?

Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.

How do I select multiple columns in spark?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How do you transpose rows into columns in spark?

Spark SQL provides a pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.


1 Answers

Spark >= 2.4

You can skip zip udf and use arrays_zip function:

df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(   $"userId", $"someString",   $"vars.varA", $"vars.varB").show 

Spark < 2.4

What you want is not possible without a custom UDF. In Scala you could do something like this:

val data = sc.parallelize(Seq(     """{"userId": 1, "someString": "example1",         "varA": [0, 2, 5], "varB": [1, 2, 9]}""",     """{"userId": 2, "someString": "example2",         "varA": [1, 20, 5], "varB": [9, null, 6]}""" ))  val df = spark.read.json(data)  df.printSchema // root //  |-- someString: string (nullable = true) //  |-- userId: long (nullable = true) //  |-- varA: array (nullable = true) //  |    |-- element: long (containsNull = true) //  |-- varB: array (nullable = true) //  |    |-- element: long (containsNull = true) 

Now we can define zip udf:

import org.apache.spark.sql.functions.{udf, explode}  val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))  df.withColumn("vars", explode(zip($"varA", $"varB"))).select(    $"userId", $"someString",    $"vars._1".alias("varA"), $"vars._2".alias("varB")).show  // +------+----------+----+----+ // |userId|someString|varA|varB| // +------+----------+----+----+ // |     1|  example1|   0|   1| // |     1|  example1|   2|   2| // |     1|  example1|   5|   9| // |     2|  example2|   1|   9| // |     2|  example2|  20|null| // |     2|  example2|   5|   6| // +------+----------+----+----+ 

With raw SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.registerTempTable("df")  sqlContext.sql(   """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""") 
like image 76
zero323 Avatar answered Sep 20 '22 04:09

zero323