I have created a RelationalGroupedDataset by calling instances.groupBy(instances.col("property_name")):
val x = instances.groupBy(instances.col("property_name"))
How do I compose a user-defined aggregate function to perform Statistics.colStats().mean on each group?
Thanks!
Spark >= 2.4
You can use Summarizer:
import org.apache.spark.ml.stat.Summarizer
val dfNew = df.as[(Int, org.apache.spark.mllib.linalg.Vector)]
.map { case (group, v) => (group, v.asML) }
.toDF("group", "features")
dfNew
.groupBy($"group")
.agg(Summarizer.mean($"features").alias("means"))
.show(false)
+-----+--------------------------------------------------------------------+
|group|means |
+-----+--------------------------------------------------------------------+
|1 |[8.740630742016827E12,2.6124956666260462E14,3.268714653521495E14] |
|6 |[2.1153266920139112E15,2.07232483974322592E17,6.2715161747245427E17]|
|3 |[6.3781865566442836E13,8.359124419656149E15,1.865567821598214E14] |
|5 |[4.270201403521642E13,6.561211706745676E13,8.395448246737938E15] |
|9 |[3.577032684241448E16,2.5432362841314468E16,2.3744826986293008E17] |
|4 |[2.339253775419023E14,8.517531902022505E13,3.055115780965264E15] |
|8 |[8.029924756674456E15,7.284873600992855E17,3.08621303029924E15] |
|7 |[3.2275104122699105E15,7.5472363442090208E16,7.022556624056291E14] |
|10 |[1.2412562261010224E16,5.741115713769269E15,4.34336779990902E16] |
|2 |[1.085528901765636E16,7.633370115869126E12,6.952642232477029E11] |
+-----+--------------------------------------------------------------------+
Spark < 2.4
You cannot use UserDefinedAggregateFunction but you can create an Aggregator using the same MultivariateOnlineSummarizer:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
type Summarizer = MultivariateOnlineSummarizer
case class VectorSumarizer(f: String) extends Aggregator[Row, Summarizer, Vector]
with Serializable {
def zero = new Summarizer
def reduce(acc: Summarizer, x: Row) = acc.add(x.getAs[Vector](f))
def merge(acc1: Summarizer, acc2: Summarizer) = acc1.merge(acc2)
// This can be easily generalized to support additional statistics
def finish(acc: Summarizer) = acc.mean
def bufferEncoder: Encoder[Summarizer] = Encoders.kryo[Summarizer]
def outputEncoder: Encoder[Vector] = ExpressionEncoder()
}
Example usage:
import org.apache.spark.mllib.random.RandomRDDs.logNormalVectorRDD
val df = spark.sparkContext.union((1 to 10).map(i =>
logNormalVectorRDD(spark.sparkContext, i, 10, 10000, 3, 1).map((i, _))
)).toDF("group", "features")
df
.groupBy($"group")
.agg(VectorSumarizer("features").toColumn.alias("means"))
.show(10, false)
The result:
+-----+---------------------------------------------------------------------+
|group|means |
+-----+---------------------------------------------------------------------+
|1 |[1.0495089547176625E15,3.057434217141363E13,8.180842267228103E13] |
|6 |[8.578684690153061E15,1.865830977115807E14,1.0690831496167929E15] |
|3 |[1.0347016972600206E14,4.952536828257269E15,8.498944924018858E13] |
|5 |[2.2135916061736424E16,1.5137112888230388E14,8.154750681129871E14] |
|9 |[6.496030194110956E15,6.2697260327708368E16,3.7282521260607136E16] |
|4 |[2.4518629692233766E14,1.959083619621557E13,5.278689364420169E13] |
|8 |[1.806052212008392E16,2.0410654639336184E16,6.409495244104527E15] |
|7 |[1.32896092658714784E17,1.2074042288752348E15,1.10951746294648096E17]|
|10 |[1.6131199347666342E19,1.24546214832341616E17,8.5265750194040304E16] |
|2 |[4.330324858747168E12,6.19671483053885E12,2.2416578004282832E13] |
+-----+---------------------------------------------------------------------+
Note:
MultivariateOnlineSummarizer requires "old style" mllib.linalg.Vector. It won't work with ml.linalg.Vector. To support these you'll have to convert between new and old types.RDDs.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With