Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create an Encoder for Scala collection (to implement custom Aggregator)?

Spark 2.3.0 with Scala 2.11. I'm implementing a custom Aggregator according to the docs here. The aggregator requires 3 types for input, buffer, and output.

My aggregator has to act upon all previous rows in the window so I declared it like this:

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

One of the override methods is supposed to return the encoder for the buffer type, which in this case is a ListBuffer. I can't find any suitable encoder for org.apache.spark.sql.Encoders nor any other way to encode this so I don't know what to return here.

I thought of creating a new case class which has a single property of type ListBuffer[Foo] and using that as my buffer class, and then using Encoders.product on that, but I am not sure if that is necessary or if there is something else I am missing. Thanks for any tips.

like image 502
Uncle Long Hair Avatar asked Mar 07 '23 11:03

Uncle Long Hair


1 Answers

You should just let Spark SQL do its work and find the proper encoder using ExpressionEncoder as follows:

scala> spark.version
res0: String = 2.3.0

case class Mod(id: Long)

import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
like image 111
Jacek Laskowski Avatar answered Mar 10 '23 09:03

Jacek Laskowski