Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: group concat equivalent in scala rdd

I have following DataFrame:

    |-----id-------|----value------|-----desc------|
    |     1        |     v1        |      d1       |
    |     1        |     v2        |      d2       |
    |     2        |     v21       |      d21      |
    |     2        |     v22       |      d22      |
    |--------------|---------------|---------------|

I want to transform it into:

    |-----id-------|----value------|-----desc------|
    |     1        |     v1;v2     |      d1;d2    |
    |     2        |     v21;v22   |      d21;d22  |
    |--------------|---------------|---------------|
  • Is it possible through data frame operations?
  • How would rdd transformation look like in this case?

I presume rdd.reduce is the key, but I have no idea how to adapt it to this scenario.

like image 750
Silverrose Avatar asked Dec 08 '15 07:12

Silverrose


People also ask

What is Concat_ws in Scala?

Using concat() or concat_ws() Spark SQL functions we can concatenate one or more DataFrame columns into a single column, In this article, you will learn using these functions and also using raw SQL to concatenate columns with Scala example.

What is AGG () in Spark?

agg(Column expr, scala.collection.Seq<Column> exprs) Compute aggregates by specifying a series of aggregate columns. DataFrame. agg(scala.collection.immutable.Map<String,String> exprs) (Scala-specific) Compute aggregates by specifying a map from column name to aggregate methods.

What is RelationalGroupedDataset?

public class RelationalGroupedDataset extends Object. A set of methods for aggregations on a DataFrame , created by Dataset. groupBy . The main method is the agg function, which has multiple variants. This class also contains convenience some first order statistics such as mean, sum for convenience.


Video Answer


3 Answers

You can transform your data using spark sql

case class Test(id: Int, value: String, desc: String)
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22")))
  .map(line => Test(line._1, line._2, line._3))
  .df

data.registerTempTable("data")
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
result.show
like image 126
Kaushal Avatar answered Oct 19 '22 02:10

Kaushal


Suppose you have something like

import scala.util.Random

val sqlc: SQLContext = ???

case class Record(id: Long, value: String, desc: String)

val testData = for {
    (i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5))
  } yield Record(i, s"v$i$j", s"d$i$j")

val df = sqlc.createDataFrame(testData)

You can easily join data as:

import sqlc.implicits._

def aggConcat(col: String) = df
      .map(row => (row.getAs[Long]("id"), row.getAs[String](col)))
      .aggregateByKey(Vector[String]())(_ :+ _, _ ++ _)

val result = aggConcat("value").zip(aggConcat("desc")).map{
      case ((id, value), (_, desc)) => (id, value, desc)
    }.toDF("id", "values", "descs") 

If you would like to have concatenated strings instead of arrays, you can run later

import org.apache.spark.sql.functions._

val resultConcat =  result
      .withColumn("values", concat_ws(";", $"values"))
      .withColumn("descs" , concat_ws(";", $"descs" ))
like image 32
Odomontois Avatar answered Oct 19 '22 03:10

Odomontois


If working with DataFrames, use UDAF

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}

class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction {
  def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil)
  def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""

  private def concatStrings(str1: String, str2: String): String = {
   (str1, str2) match {
      case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep)
      case (null, s: String) => s
      case (s: String, null) => s
      case _ => ""
    }
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val acc1 = buffer.getAs[String](0)
    val acc2 = input.getAs[String](0)
    buffer(0) = concatStrings(acc1, acc2)
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val acc1 = buffer1.getAs[String](0)
    val acc2 = buffer2.getAs[String](0)
    buffer1(0) = concatStrings(acc1, acc2)
  }

  def evaluate(buffer: Row): Any = buffer.getAs[String](0)
}

And then use this way

val stringConcatener = new ConcatStringsUDAF("Category_ID", ",")
data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs"))

As from Spark 1.6, have a look at Datasets and Aggregator.

like image 1
Boris Avatar answered Oct 19 '22 01:10

Boris