What is the best way to manage Spark tables' schemas? Do you see any drawbacks of Option 2? May you suggest any better alternatives?
Option 1: keep separate definitions for code and for metastore
The drawback of this is approach is that you have continuously keep them in sync (error prone). Another drawback - it gets cumbersome if the table has 500 columns.
create_some_table.sql [1st definition]
-- Databricks syntax (internal metastore)
CREATE TABLE IF NOT EXISTS some_table (
Id int,
Value string,
...
Year int
)
USING PARQUET
PARTITION BY (Year)
OPTIONS (
PATH 'abfss://...'
)
some_job.py [2nd definition]
def run():
df = spark.read.table('input_table') # 500 columns
df = transorm(df)
# this logic should be in `transform`, but anycase it should be
df = df.select(
'Id', 'Year', F.col('Value').cast(StringType()).alias('Value') # actually another schema definition: you have to enumerate all output columns
)
df.write.saveAsTable('some_table')
test_some_job.py [3rd definition]
def test_some_job(spark):
output_schema = ... # another definition
expected = spark.createDataFrame([...], output_schema)
Option 2: keep only one definition in code (StructType)
It's possible to generate schema on the fly. The benefit of this method - is simplicity and schema definition in single place. Do you see any drawbacks?
def run(input: Table, output: Table):
df = spark.read.table(input.name)
df = transform(df)
save(df, output)
def save(df: DataFrame, table: Table):
df \
.select(table.schema.fieldNames()) \
.write \
.partitionBy(table.partition_by) \
.option('path', table.path) \
.saveAsTable(table.name)
# In case table doesn't exists, Databricks will automatically generate table definition
class Table(NamedTuple):
name: str
path: str
partition_by: List[str]
schema: StructType
The industry solution to handling schema evolution is to include schema information with the data. So, when someone is writing data, they write schema and data both. And when someone wants to read that data, they first read schema and then read data based on the schema.
saveAsTable("t") . When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.
Schema evolution allows users to easily change the current schema of a Hudi table to adapt to the data that is changing over time. As of 0.11. 0 release, Spark SQL (Spark 3.1. x, 3.2. 1 and above) DDL support for Schema evolution has been added and is experimental.
Let me first make a few points then a recommendation.
databricks
and aws-glue
Recommendation:
USING PARQUET
to USING DELTA
Results:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With