I would like to not use null value for field of a class used in dataset. I try to use scala Option
and java Optional
but it failed:
@AllArgsConstructor // lombok
@NoArgsConstructor // mutable type is required in java :(
@Data // see https://stackoverflow.com/q/59609933/1206998
public static class TestClass {
String id;
Option<Integer> optionalInt;
}
@Test
public void testDatasetWithOptionField(){
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Option.apply(1)),
new TestClass("item .", Option.empty())
), Encoders.bean(TestClass.class));
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
}
Fails, at runtime, with message File 'generated.java', Line 77, Column 47: Cannot instantiate abstract "scala.Option"
Question: Is there a way to encode optional fields without null in a dataset, using java?
Subsidiary question: btw, I didn't use much dataset in scala either, can you validate that it is actually possible in scala to encode a case class containing Option fields?
Note: This is used in an intermediate dataset, i.e something that isn't read nor write (but for spark internal serialization)
This is fairly simple to do in Scala.
Scala Implementation
import org.apache.spark.sql.{Encoders, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("Stack-scala")
.master("local[2]")
.getOrCreate()
val ds = spark.createDataset(Seq(
TestClass("Item 1", Some(1)),
TestClass("Item 2", None)
))( Encoders.product[TestClass])
ds.collectAsList().forEach(println)
spark.stop()
}
case class TestClass(
id: String,
optionalInt: Option[Int] )
}
Java
There are various Option classes available in Java. However, none of them work out-of-the-box.
java.util.Optional
: Not serializable
scala.Option
-> Serializable but abstract, so when CodeGenerator
generates the following code, it fails!/* 081 */ // initializejavabean(newInstance(class scala.Option))
/* 082 */ final scala.Option value_9 = false ?
/* 083 */ null : new scala.Option(); // ---> Such initialization is not possible for abstract classes
/* 084 */ scala.Option javaBean_1 = value_9;
org.apache.spark.api.java.Optional
-> Spark's implementation of Optional which is serializable but has private constructors. So, it fails with error : No applicable constructor/method found for zero actual parameters. Since this is a final
class, it's not possible to extend this./* 081 */ // initializejavabean(newInstance(class org.apache.spark.api.java.Optional))
/* 082 */ final org.apache.spark.api.java.Optional value_9 = false ?
/* 083 */ null : new org.apache.spark.api.java.Optional();
/* 084 */ org.apache.spark.api.java.Optional javaBean_1 = value_9;
/* 085 */ if (!false) {
One option is to use normal Java Optionals in the data class and then use Kryo as serializer.
Encoder en = Encoders.kryo(TestClass.class);
Dataset<TestClass> ds = spark.createDataset(Arrays.asList(
new TestClass("item 1", Optional.of(1)),
new TestClass("item .", Optional.empty())
), en);
ds.collectAsList().forEach(x -> System.out.println("Found " + x));
Output:
Found TestClass(id=item 1, optionalInt=Optional[1])
Found TestClass(id=item ., optionalInt=Optional.empty)
There is a downside when using Kryo: this encoder encodes in a binary format:
ds.printSchema();
ds.show(false);
prints
root
|-- value: binary (nullable = true)
+-------------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------------+
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 B1 01 02 02]|
|[01 00 4A 61 76 61 53 74 61 72 74 65 72 24 54 65 73 74 43 6C 61 73 F3 01 01 69 74 65 6D 20 AE 01 00] |
+-------------------------------------------------------------------------------------------------------+
An udf-based solution to get the normal output columns of a dataset encoded with Kryo describes this answer.
Maybe a bit off-topic but probably a start to find a long-term solution is to look at the code of JavaTypeInference. The methods serializerFor and deserializerFor are used by ExpressionEncoder.javaBean to create the serializer and deserializer part of the encoder for Java beans.
In this pattern matching block
typeToken.getRawType match {
case c if c == classOf[String] => createSerializerForString(inputObject)
case c if c == classOf[java.time.Instant] => createSerializerForJavaInstant(inputObject)
case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)
case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)
case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
[...]
there is the handling for java.util.Optional
missing. It could probably be added here as well as in the corresponding deserialize method. This would allow Java beans to have properties of type Optional
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With