I want to save (as a parquet file) a Spark DataFrame that contains a custom class as a column. This class is composed by a Seq of another custom class. To do so, I create an UserDefinedType class for each of these classes, in a similar way to VectorUDT. I can work with the dataframe as I intended but cannot save it to disk as a parquet (or jason) I reported it as a bug, but maybe there is a problem with my code. I've implemented a simpler example to show the problem:
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any = obj match {
case A(list) =>
val row = new GenericMutableRow(1)
row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
}
}
object AUDT extends AUDT
@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)
class BUDT extends UserDefinedType[B] {
override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
override def userClass: Class[B] = classOf[B]
override def serialize(obj: Any): Any = obj match {
case B(num) =>
val row = new GenericMutableRow(1)
row.setInt(0, num)
row
}
override def deserialize(datum: Any): B = {
datum match {
case row: InternalRow => new B(row.getInt(0))
}
}
}
object BUDT extends BUDT
object TestNested {
def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
new A(Seq(new B(3), new B(4))))
val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(1 to 2 zip col).toDF()
df.show()
df.write.mode(SaveMode.Overwrite).save(...)
}
}
This results in the following error:
15/09/16 16:44:39 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalArgumentException: Nested type should be repeated: required group array { required int32 num; } at org.apache.parquet.schema.ConversionPatterns.listWrapper(ConversionPatterns.java:42) at org.apache.parquet.schema.ConversionPatterns.listType(ConversionPatterns.java:97) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:460) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:522) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convertField$1.apply(CatalystSchemaConverter.scala:521) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:521) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:526) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convertField(CatalystSchemaConverter.scala:318) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$$anonfun$convert$1.apply(CatalystSchemaConverter.scala:311) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:92) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.sql.types.StructType.map(StructType.scala:92) at org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.convert(CatalystSchemaConverter.scala:311) at org.apache.spark.sql.execution.datasources.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypesConverter.scala:58) at org.apache.spark.sql.execution.datasources.parquet.RowWriteSupport.init(ParquetTableSupport.scala:55) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetRelation.scala:94) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:272) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:234) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/16 16:44:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost):
If a save a dataframe with B instead of A no problem exists since B as no nested custom class. Am I missing something?
I had to make four changes to your code to make it work (tested in Spark 1.6.0 on Linux) and I think I can mostly explain why they're needed. I do find myself wondering, however, whether there's a simpler solution. All the changes are in AUDT
, as follows:
sqlType
, make it depend on BUDT.sqlType
, rather than just BUDT
. serialize()
, call BUDT.serialize()
on each list element. deserialize()
:
toArray(BUDT.sqlType)
instead of toArray(BUDT)
BUDT.deserialize()
on every elementHere's the resulting code:
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType =
StructType(
Seq(StructField("list",
ArrayType(BUDT.sqlType, containsNull = false),
nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any =
obj match {
case A(list) =>
val row = new GenericMutableRow(1)
val elements =
list.map(_.asInstanceOf[Any])
.map(e => BUDT.serialize(e))
.toArray
row.update(0, new GenericArrayData(elements))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow =>
val first = row.getArray(0)
val bs:Array[InternalRow] = first.toArray(BUDT.sqlType)
val bseq = bs.toSeq.map(e => BUDT.deserialize(e))
val a = new A(bseq)
a
}
}
}
All four changes have the same character: the relationship between handling of A
s and handling of B
s is now very explicit: for schema typing, for serialization and for de-serialization. The original code seems to be based on the assumption that Spark SQL will "just figure it out", which might be reasonable, but apparently it doesn't.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With