There was a question regarding this issue here:
Explode (transpose?) multiple columns in Spark SQL table
Suppose that we have extra columns as below:
**userId someString varA varB varC varD**
1 "example1" [0,2,5] [1,2,9] [a,b,c] [red,green,yellow]
2 "example2" [1,20,5] [9,null,6] [d,e,f] [white,black,cyan]
To conclude an output like below:
userId someString varA varB varC varD
1 "example1" 0 1 a red
1 "example1" 2 2 b green
1 "example1" 5 9 c yellow
2 "example2" 1 9 d white
2 "example2" 20 null e black
2 "example2" 5 6 f Cyan
The answer was by defining a udf
as:
val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
and defining "withColumn".
df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
$"userId", $"someString",
$"vars._1".alias("varA"), $"vars._2".alias("varB")).show
If we need to extend the above answer, with more columns, what is the easiest way to amend the above code. Any help please.
Select Single & Multiple Columns You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
PySpark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.
The Spark DataFrame provides the drop() method to drop the column or the field from the DataFrame or the Dataset. The drop() method is also used to remove the multiple columns from the Spark DataFrame or the Database.
Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.
The approach with the zip
udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:
def assertSameSize(arrs:Seq[_]*) = {
assert(arrs.map(_.size).distinct.size==1,"sizes differ")
}
val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
assertSameSize(xa,xb,xc,xd)
xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
}
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With