I have looked at a number of questions online, but they don't seem to do what I'm trying to achieve.
I'm using Apache Spark 2.0.2 with Scala.
I have a dataframe:
+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
| 1| 100| 0| 0| 0| 0| 0|
| 2| 0| 50| 0| 0| 20| 0|
| 3| 0| 0| 0| 0| 0| 0|
| 4| 0| 0| 0| 0| 0| 0|
+----------+-----+----+----+----+----+----+
which I want to transpose to
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+-----+----+----+----+
I've tried using pivot()
but I couldn't get to the right answer. I ended up looping through my val{x}
columns, and pivoting each as per below, but this is proving to be very slow.
val d = df.select('segment_id, 'val1)
+----------+-----+
|segment_id| val1|
+----------+-----+
| 1| 100|
| 2| 0|
| 3| 0|
| 4| 0|
+----------+-----+
d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
+----+-----+----+----+----+
Then using union()
on each iteration of val{x}
to my first dataframe.
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val2| 0| 50| 0| 0|
+----+-----+----+----+----+
Is there a more efficient way of a transpose where I do not want to aggregate data?
Thanks :)
Spark SQL provides a pivot() function to rotate the data from one column into multiple columns (transpose row to column). It is an aggregation where one of the grouping columns values is transposed into individual columns with distinct data.
Spark pivot() function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to column) and unpivot is used to transform it back (transform columns to rows).
DataFrame. selectExpr (*expr)[source] Projects a set of SQL expressions and returns a new DataFrame . This is a variant of select() that accepts SQL expressions.
Speeding-up the Pivot function Fortunately, we can speed it up by passing a list of values as the second parameter of the pivot function. Note that if the list does not contain all the pivoted column values, we will lose some data because Spark drops the rows that don't match any of the given parameters.
Unfortunately there is no case when:
DataFrame
is justified considering amount of data.You have to remember that DataFrame
, as implemented in Spark, is a distributed collection of rows and each row is stored and processed on a single node.
You could express transposition on a DataFrame
as pivot
:
val kv = explode(array(df.columns.tail.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
df
.withColumn("kv", kv)
.select($"segment_id", $"kv.k", $"kv.v")
.groupBy($"k")
.pivot("segment_id")
.agg(first($"v"))
.orderBy($"k")
.withColumnRenamed("k", "vals")
but it is merely a toy code with no practical applications. In practice it is not better than collecting data:
val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
case Array(h, t @ _*) => {
(h.map(_.toString), t.map(_.collect { case x: Int => x }))
}
}
val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)
spark.createDataFrame(sc.parallelize(rows), schema)
For DataFrame
defined as:
val df = Seq(
(1, 100, 0, 0, 0, 0, 0),
(2, 0, 50, 0, 0, 20, 0),
(3, 0, 0, 0, 0, 0, 0),
(4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
both would you give you the desired result:
+----+---+---+---+---+
|vals| 1| 2| 3| 4|
+----+---+---+---+---+
|val1|100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+---+---+---+---+
That being said if you need an efficient transpositions on distributed data structure you'll have to look somewhere else. There is a number of structures, including core CoordinateMatrix
and BlockMatrix
, which can distribute data across both dimensions and can be transposed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With