Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use User Defined Types in Spark 2.0?

In Spark 2.0, the one example I've found of creating a UDT in Scala seems to no longer be applicable. The UserDefinedType class has been set as private, with the comment:

Note: This was previously a developer API in Spark 1.x. We are making this private in Spark 2.0 because we will very likely create a new version of this that works better with Datasets.

It might be the intent of UDTRegistration to be the new mechanism of declaring UDT, but it's also private.

So far, my research tells me that there is no way to declare your own UDTs in Spark 2.0; is this conclusion correct?

like image 396
Anders Olsson Avatar asked Aug 23 '16 08:08

Anders Olsson


People also ask

How does UDF work in spark?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

Should we use UDF in spark?

It is quite simple: it is recommended to rely as much as possible on Spark's built-in functions and only use a UDF when your transformation can't be done with the built-in functions. UDFs cannot be optimized by Spark's Catalyst optimizer, so there is always a potential decrease in performance.


2 Answers

You can get UDTs to work with Spark using UDTRegistration but you have to use a private API to do it which may not be supported in the future. Use this approach with great caution and only when absolutely necessary. For some use-cases, unfortunately, there is no other option.

Say you want to use a Polymorphic Record:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

You can write a custom UDT that encodes everything to bytes (I'm using java serialization here but it's probably better to instrument Spark's Kryo context).

First define the UDT class:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Then register it:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Then you can use it!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Check out my original post here, it has an additional example: How to store custom objects in Dataset?

Edit: This post was down-voted for understandable reasons. I included a caveat-emptor at the top hopefully to prevent misunderstandings.

like image 185
Choppy The Lumberjack Avatar answered Oct 04 '22 15:10

Choppy The Lumberjack


Well you are right for now, the Spark 2.x has no more any kind of UDT to use as an API that was like in Spark 1.x.

You can see here in this ticket SPARK-14155 that they make it private to create a new API. That we have a ticket open since Spark 1.5 that we wish that will be closed in Spark 2.2 SPARK-7768.

Well, types are not good for now to create your UDT but... There few tricks that you can set your custom objects to a DataSet. Here is one example.

like image 24
Thiago Baldim Avatar answered Oct 04 '22 14:10

Thiago Baldim