Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to melt Spark DataFrame?

Is there an equivalent of Pandas Melt function in Apache Spark in PySpark or at least in Scala?

I was running a sample dataset till now in Python and now I want to use Spark for the entire dataset.

like image 726
Venkatesh Durgumahanthi Avatar asked Jan 16 '17 05:01

Venkatesh Durgumahanthi


People also ask

What is melt in PySpark?

Melt (also known as unpivot) In other words, the data frame is converted from wide to long format. The starting data frame to demonstrate this recipe can be constructed with. There are likely several ways to implement a melt function in PySpark.

How do I Unpivot Spark data frame?

Step 3: Unpivot Spark DataFrame Unpivot is a reverse operation; we can achieve this by rotating column values into rows values. There's no equivalent dataframe operator for the unpivot operation; we must use selectExpr() along with the stack builtin.

What is melt ()?

Pandas melt() function is used to change the DataFrame format from wide to long. It's used to create a specific format of the DataFrame object where one or more columns work as identifiers. All the remaining columns are treated as values and unpivoted to the row axis and only two columns - variable and value.

Can you convert a Spark DataFrame to a pandas DataFrame?

Convert PySpark Dataframe to Pandas DataFramePySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.


2 Answers

There is no built-in function (if you work with SQL and Hive support enabled you can use stack function, but it is not exposed in Spark and has no native implementation) but it is trivial to roll your own. Required imports:

from pyspark.sql.functions import array, col, explode, lit, struct from pyspark.sql import DataFrame from typing import Iterable  

Example implementation:

def melt(         df: DataFrame,          id_vars: Iterable[str], value_vars: Iterable[str],          var_name: str="variable", value_name: str="value") -> DataFrame:     """Convert :class:`DataFrame` from wide to long format."""      # Create array<struct<variable: str, value: ...>>     _vars_and_vals = array(*(         struct(lit(c).alias(var_name), col(c).alias(value_name))          for c in value_vars))      # Add to the DataFrame and explode     _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))      cols = id_vars + [             col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]     return _tmp.select(*cols) 

And some tests (based on Pandas doctests):

import pandas as pd  pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},                    'B': {0: 1, 1: 3, 2: 5},                    'C': {0: 2, 1: 4, 2: 6}})  pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C']) 
   A variable  value 0  a        B      1 1  b        B      3 2  c        B      5 3  a        C      2 4  b        C      4 5  c        C      6 
sdf = spark.createDataFrame(pdf) melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show() 
+---+--------+-----+ |  A|variable|value| +---+--------+-----+ |  a|       B|    1| |  a|       C|    2| |  b|       B|    3| |  b|       C|    4| |  c|       B|    5| |  c|       C|    6| +---+--------+-----+ 

Note: For use with legacy Python versions remove type annotations.

Related:

  • r sparkR - equivalent to melt function
  • Gather in sparklyr
like image 79
zero323 Avatar answered Sep 24 '22 19:09

zero323


Came across this question in my search for an implementation of melt in Spark for Scala.

Posting my Scala port in case someone also stumbles upon this.

import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame} /** Extends the [[org.apache.spark.sql.DataFrame]] class  *  *  @param df the data frame to melt  */ implicit class DataFrameFunctions(df: DataFrame) {      /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.      *       *  melt is (kind of) the inverse of pivot      *  melt is currently (02/2017) not implemented in spark      *      *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)      *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark      *        *  @todo method overloading for simple calling      *      *  @param id_vars the columns to preserve      *  @param value_vars the columns to melt      *  @param var_name the name for the column holding the melted columns names      *  @param value_name the name for the column holding the values of the melted columns      *      */      def melt(             id_vars: Seq[String], value_vars: Seq[String],              var_name: String = "variable", value_name: String = "value") : DataFrame = {          // Create array<struct<variable: str, value: ...>>         val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)          // Add to the DataFrame and explode         val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))          val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}          return _tmp.select(cols: _*)      } } 

Since I'm am not that advanced considering Scala, I'm sure there is room for improvement.

Any comments are welcome.

like image 31
Ahue Avatar answered Sep 21 '22 19:09

Ahue