Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge Schema with int and double cannot be resolved when reading parquet file

I've got two parquet files, one contains an integer field myField and another contains a double field myField. When attempting to read both the files at once

val basePath = "/path/to/file/"
val fileWithInt = basePath + "intFile.snappy.parquet"
val fileWithDouble = basePath + "doubleFile.snappy.parquet"
val result = spark.sqlContext.read.option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")

I get the following error

Caused by: org.apache.spark.SparkException: Failed to merge fields 'myField' and 'myField'. Failed to merge incompatible data types IntegerType and DoubleType

When passing an explicit schema

val schema = StructType(Seq(new StructField("myField", IntegerType)))
val result = spark.sqlContext.read.schema(schema).option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")

It fails with the following

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)

When casting up to a double

val schema = StructType(Seq(new StructField("myField", DoubleType)))

I get

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
    at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:60)

Does anyone know any ways around this problem other than reprocessing the source data.

like image 346
John Cragg Avatar asked Dec 18 '18 09:12

John Cragg


People also ask

Can we change the schema of Parquet file?

It's all immutable. The problem we have when we need to edit the data is that our data structures are immutable. You can add partitions to Parquet files, but you can't edit the data in place.

Does Parquet allow schema evolution?

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas.

What does merge schema do?

Schema merging ( @graphql-tools/merge and @graphql-tools/schema ) consolidates the type definitions and resolvers from many local schema instances into a single executable schema.


1 Answers

Depending on the number of files you are going to read you can use one of these two approachs:

This would be best for a smaller number of parquet files

def merge(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType))
    }.reduce(_.union(_))
  }

This approach will be better to process a large number of files since it will keep lineage short

def merge2(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    spark.sparkContext.union(paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType)).as[Double].rdd
    }.toList).toDF
  }
like image 163
Mikel San Vicente Avatar answered Sep 23 '22 19:09

Mikel San Vicente