Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle changing parquet schema in Apache Spark

Tags:

I have run into a problem where I have Parquet data as daily chunks in S3 (in the form of s3://bucketName/prefix/YYYY/MM/DD/) but I cannot read the data in AWS EMR Spark from different dates because some column types do not match and I get one of many exceptions, for example:

java.lang.ClassCastException: optional binary element (UTF8) is not a group 

appears when in some files there's an array type which has a value but the same column may have null value in other files which are then inferred as String types.

or

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal): org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true) 

I have raw data in S3 in JSON format and my initial plan was to create an automatic job, which starts an EMR cluster, reads in the JSON data for the previous date and simply writes it as parquet back to S3.

The JSON data is also divided into dates, i.e. keys have date prefixes. Reading JSON works fine. Schema is inferred from the data no matter how much data is currently being read.

But the problem rises when parquet files are written. As I understand, when I write parquet with metadata files, these files contain the schema for all parts/partitions of the parquet files. Which, to me it seems, can also be with different schemas. When I disable writing metadata, Spark was said to infer the whole schema from the first file within the given Parquet path and presume it stays the same through other files.

When some columns, which should be double type, have only integer values for a given day, reading in them from JSON (which has these numbers as integers, without floating points) makes Spark think it is a column with type long. Even if I can cast these columns to double before writing the Parquet files, this still is not good as the schema might change, new columns can be added, and tracking this is impossible.

I have seen some people have the same problems but I have yet to find a good enough solution.

What are the best practices or solutions for this?

like image 966
V. Samma Avatar asked Dec 02 '16 07:12

V. Samma


People also ask

How do you handle schema evolution in Parquet?

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas.

Can we change the schema of Parquet file?

You can always switch it back on). Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5. 0. Without schema evolution, you can read schema from one parquet file, and while reading rest of files assume it stays the same.

Can Parquet support schema evaluation in Spark?

With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema of those files automatically.

How do I change the Spark schema?

You cannot change schema like this. Schema object passed to createDataFrame has to match the data, not the other way around: To parse timestamp data use corresponding functions, for example like Better way to convert a string field into timestamp in Spark.


1 Answers

These are the options I use for writing parquet to S3; turning off schema merging boosts writeback performance -it may also address your problem

val PARQUET_OPTIONS = Map(  "spark.sql.parquet.mergeSchema" -> "false",  "spark.sql.parquet.filterPushdown" -> "true") 
like image 80
stevel Avatar answered Oct 18 '22 11:10

stevel