I have run into a problem where I have Parquet data as daily chunks in S3 (in the form of s3://bucketName/prefix/YYYY/MM/DD/
) but I cannot read the data in AWS EMR Spark from different dates because some column types do not match and I get one of many exceptions, for example:
java.lang.ClassCastException: optional binary element (UTF8) is not a group
appears when in some files there's an array type which has a value but the same column may have null
value in other files which are then inferred as String types.
or
org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal): org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)
I have raw data in S3 in JSON format and my initial plan was to create an automatic job, which starts an EMR cluster, reads in the JSON data for the previous date and simply writes it as parquet back to S3.
The JSON data is also divided into dates, i.e. keys have date prefixes. Reading JSON works fine. Schema is inferred from the data no matter how much data is currently being read.
But the problem rises when parquet files are written. As I understand, when I write parquet with metadata files, these files contain the schema for all parts/partitions of the parquet files. Which, to me it seems, can also be with different schemas. When I disable writing metadata, Spark was said to infer the whole schema from the first file within the given Parquet path and presume it stays the same through other files.
When some columns, which should be double
type, have only integer values for a given day, reading in them from JSON (which has these numbers as integers, without floating points) makes Spark think it is a column with type long
. Even if I can cast these columns to double before writing the Parquet files, this still is not good as the schema might change, new columns can be added, and tracking this is impossible.
I have seen some people have the same problems but I have yet to find a good enough solution.
What are the best practices or solutions for this?
Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas.
You can always switch it back on). Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5. 0. Without schema evolution, you can read schema from one parquet file, and while reading rest of files assume it stays the same.
With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema of those files automatically.
You cannot change schema like this. Schema object passed to createDataFrame has to match the data, not the other way around: To parse timestamp data use corresponding functions, for example like Better way to convert a string field into timestamp in Spark.
These are the options I use for writing parquet to S3; turning off schema merging boosts writeback performance -it may also address your problem
val PARQUET_OPTIONS = Map( "spark.sql.parquet.mergeSchema" -> "false", "spark.sql.parquet.filterPushdown" -> "true")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With