Spark 2.3.0 with Scala 2.11. I'm implementing a custom Aggregator
according to the docs here. The aggregator requires 3 types for input, buffer, and output.
My aggregator has to act upon all previous rows in the window so I declared it like this:
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
One of the override methods is supposed to return the encoder for the buffer type, which in this case is a ListBuffer
. I can't find any suitable encoder for org.apache.spark.sql.Encoders
nor any other way to encode this so I don't know what to return here.
I thought of creating a new case class which has a single property of type ListBuffer[Foo]
and using that as my buffer class, and then using Encoders.product
on that, but I am not sure if that is necessary or if there is something else I am missing. Thanks for any tips.
You should just let Spark SQL do its work and find the proper encoder using ExpressionEncoder
as follows:
scala> spark.version
res0: String = 2.3.0
case class Mod(id: Long)
import org.apache.spark.sql.Encoder
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val enc: Encoder[ListBuffer[Mod]] = ExpressionEncoder()
enc: org.apache.spark.sql.Encoder[scala.collection.mutable.ListBuffer[Mod]] = class[value[0]: array<struct<id:bigint>>]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With