I am using Spark SQL (I mention that it is in Spark in case that affects the SQL syntax - I'm not familiar enough to be sure yet) and I have a table that I am trying to re-structure, but I'm getting stuck trying to transpose multiple columns at the same time.
Basically I have data that looks like:
userId someString varA varB 1 "example1" [0,2,5] [1,2,9] 2 "example2" [1,20,5] [9,null,6]
and I'd like to explode both varA and varB simultaneously (the length will always be consistent) - so that the final output looks like this:
userId someString varA varB 1 "example1" 0 1 1 "example1" 2 2 1 "example1" 5 9 2 "example2" 1 9 2 "example2" 20 null 2 "example2" 5 6
but I can only seem to get a single explode(var) statement to work in one command, and if I try to chain them (ie create a temp table after the first explode command) then I obviously get a huge number of duplicate, unnecessary rows.
Many thanks!
Spark SQL explode function is used to create or split an array or map DataFrame columns to rows. Spark defines several flavors of this function; explode_outer – to handle nulls and empty, posexplode – which explodes with a position of element and posexplode_outer – to handle nulls.
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
Spark SQL provides a pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.
Spark >= 2.4
You can skip zip
udf
and use arrays_zip
function:
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select( $"userId", $"someString", $"vars.varA", $"vars.varB").show
Spark < 2.4
What you want is not possible without a custom UDF. In Scala you could do something like this:
val data = sc.parallelize(Seq( """{"userId": 1, "someString": "example1", "varA": [0, 2, 5], "varB": [1, 2, 9]}""", """{"userId": 2, "someString": "example2", "varA": [1, 20, 5], "varB": [9, null, 6]}""" )) val df = spark.read.json(data) df.printSchema // root // |-- someString: string (nullable = true) // |-- userId: long (nullable = true) // |-- varA: array (nullable = true) // | |-- element: long (containsNull = true) // |-- varB: array (nullable = true) // | |-- element: long (containsNull = true)
Now we can define zip
udf:
import org.apache.spark.sql.functions.{udf, explode} val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.withColumn("vars", explode(zip($"varA", $"varB"))).select( $"userId", $"someString", $"vars._1".alias("varA"), $"vars._2".alias("varB")).show // +------+----------+----+----+ // |userId|someString|varA|varB| // +------+----------+----+----+ // | 1| example1| 0| 1| // | 1| example1| 2| 2| // | 1| example1| 5| 9| // | 2| example2| 1| 9| // | 2| example2| 20|null| // | 2| example2| 5| 6| // +------+----------+----+----+
With raw SQL:
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.registerTempTable("df") sqlContext.sql( """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With