Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transpose column to row with Spark

I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:

+-----+-----+-----+-------+ |  A  |col_1|col_2|col_...| +-----+-------------------+ |  1  |  0.0|  0.6|  ...  | |  2  |  0.6|  0.7|  ...  | |  3  |  0.5|  0.9|  ...  | |  ...|  ...|  ...|  ...  | 

I would like to have somthing like this:

+-----+--------+-----------+ |  A  | col_id | col_value | +-----+--------+-----------+ |  1  |   col_1|        0.0| |  1  |   col_2|        0.6|    |  ...|     ...|        ...|     |  2  |   col_1|        0.6| |  2  |   col_2|        0.7|  |  ...|     ...|        ...|   |  3  |   col_1|        0.5| |  3  |   col_2|        0.9| |  ...|     ...|        ...| 

Does someone know haw I can do it? Thank you for your help.

like image 712
Raouf Avatar asked Jun 16 '16 16:06

Raouf


People also ask

How do I transpose columns to rows in Spark?

Spark pivot() function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to column) and unpivot is used to transform it back (transform columns to rows).

How do I change column position in Spark?

In order to Rearrange or reorder the column in pyspark we will be using select function. To reorder the column in ascending order we will be using Sorted function. To reorder the column in descending order we will be using Sorted function with an argument reverse =True. We also rearrange the column by position.

How do you pivot in Spark?

To do the same group/pivot/sum in Spark the syntax is df. groupBy("A", "B"). pivot("C").


1 Answers

It is relatively simple to do with basic Spark SQL functions.

Python

from pyspark.sql.functions import array, col, explode, struct, lit  df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])  def to_long(df, by):      # Filter dtypes and split into column names and type description     cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))     # Spark SQL supports only homogeneous columns     assert len(set(dtypes)) == 1, "All columns have to be of the same type"      # Create and explode an array of (column_name, column_value) structs     kvs = explode(array([       struct(lit(c).alias("key"), col(c).alias("val")) for c in cols     ])).alias("kvs")      return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])  to_long(df, ["A"]) 

Scala:

import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{array, col, explode, lit, struct}  val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")  def toLong(df: DataFrame, by: Seq[String]): DataFrame = {   val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip   require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")          val kvs = explode(array(     cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*   ))    val byExprs = by.map(col(_))    df     .select(byExprs :+ kvs.alias("_kvs"): _*)     .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*) }  toLong(df, Seq("A")) 
like image 64
zero323 Avatar answered Sep 23 '22 22:09

zero323