I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:
+-----+-----+-----+-------+ | A |col_1|col_2|col_...| +-----+-------------------+ | 1 | 0.0| 0.6| ... | | 2 | 0.6| 0.7| ... | | 3 | 0.5| 0.9| ... | | ...| ...| ...| ... |
I would like to have somthing like this:
+-----+--------+-----------+ | A | col_id | col_value | +-----+--------+-----------+ | 1 | col_1| 0.0| | 1 | col_2| 0.6| | ...| ...| ...| | 2 | col_1| 0.6| | 2 | col_2| 0.7| | ...| ...| ...| | 3 | col_1| 0.5| | 3 | col_2| 0.9| | ...| ...| ...|
Does someone know haw I can do it? Thank you for your help.
Spark pivot() function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to column) and unpivot is used to transform it back (transform columns to rows).
In order to Rearrange or reorder the column in pyspark we will be using select function. To reorder the column in ascending order we will be using Sorted function. To reorder the column in descending order we will be using Sorted function with an argument reverse =True. We also rearrange the column by position.
To do the same group/pivot/sum in Spark the syntax is df. groupBy("A", "B"). pivot("C").
It is relatively simple to do with basic Spark SQL functions.
Python
from pyspark.sql.functions import array, col, explode, struct, lit df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"]) def to_long(df, by): # Filter dtypes and split into column names and type description cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by)) # Spark SQL supports only homogeneous columns assert len(set(dtypes)) == 1, "All columns have to be of the same type" # Create and explode an array of (column_name, column_value) structs kvs = explode(array([ struct(lit(c).alias("key"), col(c).alias("val")) for c in cols ])).alias("kvs") return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"]) to_long(df, ["A"])
Scala:
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{array, col, explode, lit, struct} val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2") def toLong(df: DataFrame, by: Seq[String]): DataFrame = { val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1") val kvs = explode(array( cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _* )) val byExprs = by.map(col(_)) df .select(byExprs :+ kvs.alias("_kvs"): _*) .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*) } toLong(df, Seq("A"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With