Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark merge dataframe with mismatching schemas without extra disk IO

I would like to merge 2 dataframes with (potentially) mismatching schemas

org.apache.spark.sql.DataFrame = [name: string, age: int, height: int]
org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> A.unionAll(B)

would result in :

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 2 columns and the right has 3;

I would like to do this from within Spark. However, the Spark docs only propose to write the whole 2 dataframes out to a directory and read them back in using spark.read.option("mergeSchema", "true").

link to docs

So a union doesn't help me out, and neither does the documentation. I would like to keep this extra I/O out of my job if at all possible. Am I missing some undocumented info, or is it not possible (yet)?

like image 933
Havnar Avatar asked Oct 05 '16 08:10

Havnar


3 Answers

parquet schema merging is disabled by default, turn on this option by:

(1) set global option: spark.sql.parquet.mergeSchema=true

(2) write code: sqlContext.read.option("mergeSchema", "true").parquet("my.parquet")
like image 165
Gary Gauh Avatar answered Oct 17 '22 05:10

Gary Gauh


You can append a null column to frame B and after union 2 frames:

import org.apache.spark.sql.functions._
val missingFields = A.schema.toSet.diff(B.schema.toSet)
var C: DataFrame = null
for (field <- missingFields){ 
   C = A.withColumn(field.name, expr("null")); 
} 
A.unionAll(C)
like image 8
prudenko Avatar answered Oct 17 '22 06:10

prudenko


Here's a pyspark solution.

It assumes that if the merge can't take place because one dataframe is missing a column contained in the other, then the right thing is to add the missing column with null values.

On the other hand, if the merge can't take place because the two dataframes share a column with conflicting type or nullability, then the right thing is to raise a TypeError (because that's a conflict you probably want to know about).

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = right_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = right_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))       
    return df_left.union(df_right)
like image 3
conradlee Avatar answered Oct 17 '22 06:10

conradlee