Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Explode multiple columns in Spark SQL table

There was a question regarding this issue here:

Explode (transpose?) multiple columns in Spark SQL table

Suppose that we have extra columns as below:

**userId    someString      varA     varB      varC    varD**
   1        "example1"    [0,2,5]   [1,2,9]    [a,b,c] [red,green,yellow]
   2        "example2"    [1,20,5]  [9,null,6] [d,e,f] [white,black,cyan]

To conclude an output like below:

userId    someString      varA     varB   varC     varD
   1      "example1"       0         1     a       red
   1      "example1"       2         2     b       green
   1      "example1"       5         9     c       yellow
   2      "example2"       1         9     d       white
   2      "example2"       20       null   e       black
   2      "example2"       5         6     f       Cyan

The answer was by defining a udf as:

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

and defining "withColumn".

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

If we need to extend the above answer, with more columns, what is the easiest way to amend the above code. Any help please.

like image 860
Mohd Zoubi Avatar asked Jul 29 '17 06:07

Mohd Zoubi


People also ask

How do I select multiple columns in Spark?

Select Single & Multiple Columns You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How do you explode columns in PySpark?

PySpark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.

How do I drop multiple columns in Spark DataFrame?

The Spark DataFrame provides the drop() method to drop the column or the field from the DataFrame or the Dataset. The drop() method is also used to remove the multiple columns from the Spark DataFrame or the Database.

How do you explode in Spark?

Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.


1 Answers

The approach with the zip udf seems ok, but you need to extend if for more collections. Unfortunately there is no really nice way to zip 4 Seqs, but this should work:

def assertSameSize(arrs:Seq[_]*) = {
 assert(arrs.map(_.size).distinct.size==1,"sizes differ") 
}

val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
    assertSameSize(xa,xb,xc,xd)
    xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
  }
)
like image 132
Raphael Roth Avatar answered Oct 29 '22 10:10

Raphael Roth