Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why are new columns added to parquet tables not available from glue pyspark ETL jobs?

We've been exploring using Glue to transform some JSON data to parquet. One scenario we tried was adding a column to the parquet table. So partition 1 has columns [A] and partition 2 has columns [A,B]. Then we wanted to write further Glue ETL jobs to aggregate the parquet table but the new column was not available. Using glue_context.create_dynamic_frame.from_catalog to load the dynamic frame our new column was never in the schema.

We tried several configurations for our table crawler. Using a single schema for all partitions, single schema for s3 path, schema per partition. We could always see the new column in the Glue table data but it was always null if we queried it from a Glue job using pyspark. The column was in the parquet when we downloaded some samples and available for querying via Athena.

Why are the new columns not available to pyspark?

like image 686
roby Avatar asked Apr 09 '19 04:04

roby


1 Answers

This turned out to be a spark configuration issue. From the spark docs:

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by

  1. setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true.

We could enable schema merging in two ways.

  1. set the option on the spark session spark.conf.set("spark.sql.parquet.mergeSchema", "true")
  2. set mergeSchema to true in the additional_options when loading the dynamic frame.
source = glueContext.create_dynamic_frame.from_catalog(
   database="db",
   table_name="table",
   additional_options={"mergeSchema": "true"}
)

After that the new column was available in the frame's schema.

like image 172
roby Avatar answered Oct 31 '22 14:10

roby