I would like to merge 2 dataframes with (potentially) mismatching schemas
org.apache.spark.sql.DataFrame = [name: string, age: int, height: int]
org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> A.unionAll(B)
would result in :
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 2 columns and the right has 3;
I would like to do this from within Spark.
However, the Spark docs only propose to write the whole 2 dataframes out to a directory and read them back in using spark.read.option("mergeSchema", "true")
.
link to docs
So a union doesn't help me out, and neither does the documentation. I would like to keep this extra I/O out of my job if at all possible. Am I missing some undocumented info, or is it not possible (yet)?
parquet schema merging is disabled by default, turn on this option by:
(1) set global option: spark.sql.parquet.mergeSchema=true
(2) write code: sqlContext.read.option("mergeSchema", "true").parquet("my.parquet")
You can append a null column to frame B and after union 2 frames:
import org.apache.spark.sql.functions._
val missingFields = A.schema.toSet.diff(B.schema.toSet)
var C: DataFrame = null
for (field <- missingFields){
C = A.withColumn(field.name, expr("null"));
}
A.unionAll(C)
Here's a pyspark solution.
It assumes that if the merge can't take place because one dataframe is missing a column contained in the other, then the right thing is to add the missing column with null values.
On the other hand, if the merge can't take place because the two dataframes share a column with conflicting type or nullability, then the right thing is to raise a TypeError (because that's a conflict you probably want to know about).
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = right_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
return df_left.union(df_right)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With