Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark schema management at single place

Tags:

Question

What is the best way to manage Spark tables' schemas? Do you see any drawbacks of Option 2? May you suggest any better alternatives?

Solutions I see

Option 1: keep separate definitions for code and for metastore

The drawback of this is approach is that you have continuously keep them in sync (error prone). Another drawback - it gets cumbersome if the table has 500 columns.

create_some_table.sql [1st definition]

-- Databricks syntax (internal metastore)
CREATE TABLE IF NOT EXISTS some_table (
  Id int,
  Value string,
  ...
  Year int
)
USING PARQUET
PARTITION BY (Year)
OPTIONS (
  PATH 'abfss://...'
)

some_job.py [2nd definition]

def run():
   df = spark.read.table('input_table')  # 500 columns
   df = transorm(df)
   # this logic should be in `transform`, but anycase it should be
   df = df.select(
     'Id', 'Year', F.col('Value').cast(StringType()).alias('Value')  # actually another schema definition: you have to enumerate all output columns
   )
   df.write.saveAsTable('some_table')

test_some_job.py [3rd definition]

def test_some_job(spark):
   output_schema = ...  # another definition
   expected = spark.createDataFrame([...], output_schema)

Option 2: keep only one definition in code (StructType)

It's possible to generate schema on the fly. The benefit of this method - is simplicity and schema definition in single place. Do you see any drawbacks?

def run(input: Table, output: Table):
   df = spark.read.table(input.name)
   df = transform(df)
   save(df, output)    

def save(df: DataFrame, table: Table): 
    df \
        .select(table.schema.fieldNames()) \
        .write \
        .partitionBy(table.partition_by) \
        .option('path', table.path) \
        .saveAsTable(table.name)
    # In case table doesn't exists, Databricks will automatically generate table definition
        
class Table(NamedTuple):
    name: str
    path: str
    partition_by: List[str]
    schema: StructType
like image 610
VB_ Avatar asked Aug 14 '20 21:08

VB_


People also ask

How do you handle schema evolution?

The industry solution to handling schema evolution is to include schema information with the data. So, when someone is writing data, they write schema and data both. And when someone wants to read that data, they first read schema and then read data based on the schema.

What is saveAsTable in Spark?

saveAsTable("t") . When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

What is schema evolution in Spark?

Schema evolution allows users to easily change the current schema of a Hudi table to adapt to the data that is changing over time. As of 0.11. 0 release, Spark SQL (Spark 3.1. x, 3.2. 1 and above) DDL support for Schema evolution has been added and is experimental.


1 Answers

Let me first make a few points then a recommendation.

  1. Data lives a lot longer than code.
  2. Code described above is code that creates & writes the data, there is also code that reads and consumes the data that needs to be considered.
  3. There's a 3rd option, storing the definition of the data (schema) with the data. Often called a 'self describing format'
  4. The structure of data can change over time.
  5. This question is tagged with databricks and aws-glue
  6. Parquet is self describing on a file by file basis.
  7. Delta Lake tables use parquet data files, but additionally embed the schema into the transaction log and is thus the entire table and schema is versioned.
  8. Data needs to be used by a wide eco system of tools, thus the data needs to be discoverable, the schema should not be locked into one compute engine.

Recommendation:

  1. Store the schema with the data in an open format
  2. Use Delta Lake format (which combines Parquet and a transaction log)
  3. Change USING PARQUET to USING DELTA
  4. Point your meta store to AWS Glue Catalog, Glue catalog will store the table name, and location
  5. Consumers will resolve the schema from the Delta Lake table transaction log
  6. Schema can evolve as the writer code evolves.

Results:

  1. Your writer creates the schema, and may optionally evolve the schema
  2. All consumers will find the schema (paired with the table version) in the Delta Lake (_delta_log dir to be specific)
like image 94
Douglas M Avatar answered Sep 18 '22 11:09

Douglas M