I've got two parquet files, one contains an integer field myField
and another contains a double field myField
. When attempting to read both the files at once
val basePath = "/path/to/file/"
val fileWithInt = basePath + "intFile.snappy.parquet"
val fileWithDouble = basePath + "doubleFile.snappy.parquet"
val result = spark.sqlContext.read.option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")
I get the following error
Caused by: org.apache.spark.SparkException: Failed to merge fields 'myField' and 'myField'. Failed to merge incompatible data types IntegerType and DoubleType
When passing an explicit schema
val schema = StructType(Seq(new StructField("myField", IntegerType)))
val result = spark.sqlContext.read.schema(schema).option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")
It fails with the following
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
When casting up to a double
val schema = StructType(Seq(new StructField("myField", DoubleType)))
I get
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:60)
Does anyone know any ways around this problem other than reprocessing the source data.
It's all immutable. The problem we have when we need to edit the data is that our data structures are immutable. You can add partitions to Parquet files, but you can't edit the data in place.
Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas.
Schema merging ( @graphql-tools/merge and @graphql-tools/schema ) consolidates the type definitions and resolvers from many local schema instances into a single executable schema.
Depending on the number of files you are going to read you can use one of these two approachs:
This would be best for a smaller number of parquet files
def merge(spark: SparkSession, paths: Seq[String]): DataFrame = {
import spark.implicits._
paths.par.map {
path =>
spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType))
}.reduce(_.union(_))
}
This approach will be better to process a large number of files since it will keep lineage short
def merge2(spark: SparkSession, paths: Seq[String]): DataFrame = {
import spark.implicits._
spark.sparkContext.union(paths.par.map {
path =>
spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType)).as[Double].rdd
}.toList).toDF
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With