I have following DataFrame:
|-----id-------|----value------|-----desc------|
| 1 | v1 | d1 |
| 1 | v2 | d2 |
| 2 | v21 | d21 |
| 2 | v22 | d22 |
|--------------|---------------|---------------|
I want to transform it into:
|-----id-------|----value------|-----desc------|
| 1 | v1;v2 | d1;d2 |
| 2 | v21;v22 | d21;d22 |
|--------------|---------------|---------------|
I presume rdd.reduce is the key, but I have no idea how to adapt it to this scenario.
Using concat() or concat_ws() Spark SQL functions we can concatenate one or more DataFrame columns into a single column, In this article, you will learn using these functions and also using raw SQL to concatenate columns with Scala example.
agg(Column expr, scala.collection.Seq<Column> exprs) Compute aggregates by specifying a series of aggregate columns. DataFrame. agg(scala.collection.immutable.Map<String,String> exprs) (Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods.
public class RelationalGroupedDataset extends Object. A set of methods for aggregations on a DataFrame , created by Dataset. groupBy . The main method is the agg function, which has multiple variants. This class also contains convenience some first order statistics such as mean, sum for convenience.
You can transform your data using spark sql
case class Test(id: Int, value: String, desc: String)
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22")))
.map(line => Test(line._1, line._2, line._3))
.df
data.registerTempTable("data")
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
result.show
Suppose you have something like
import scala.util.Random
val sqlc: SQLContext = ???
case class Record(id: Long, value: String, desc: String)
val testData = for {
(i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5))
} yield Record(i, s"v$i$j", s"d$i$j")
val df = sqlc.createDataFrame(testData)
You can easily join data as:
import sqlc.implicits._
def aggConcat(col: String) = df
.map(row => (row.getAs[Long]("id"), row.getAs[String](col)))
.aggregateByKey(Vector[String]())(_ :+ _, _ ++ _)
val result = aggConcat("value").zip(aggConcat("desc")).map{
case ((id, value), (_, desc)) => (id, value, desc)
}.toDF("id", "values", "descs")
If you would like to have concatenated strings instead of arrays, you can run later
import org.apache.spark.sql.functions._
val resultConcat = result
.withColumn("values", concat_ws(";", $"values"))
.withColumn("descs" , concat_ws(";", $"descs" ))
If working with DataFrames, use UDAF
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil)
def dataType: DataType = StringType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""
private def concatStrings(str1: String, str2: String): String = {
(str1, str2) match {
case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep)
case (null, s: String) => s
case (s: String, null) => s
case _ => ""
}
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val acc1 = buffer.getAs[String](0)
val acc2 = input.getAs[String](0)
buffer(0) = concatStrings(acc1, acc2)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val acc1 = buffer1.getAs[String](0)
val acc2 = buffer2.getAs[String](0)
buffer1(0) = concatStrings(acc1, acc2)
}
def evaluate(buffer: Row): Any = buffer.getAs[String](0)
}
And then use this way
val stringConcatener = new ConcatStringsUDAF("Category_ID", ",")
data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs"))
As from Spark 1.6, have a look at Datasets and Aggregator.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With