Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining Spark schema without duplicates?

To process the data I have, I am extracting the schema before, so that when I read the dataset, I provide the schema instead of going through the expensive step of inferring schema.

In order to construct the schema, I need to merge in several different schema into the final schema, so I have been using union (++) and distinct methods, but I keep getting org.apache.spark.sql.AnalysisException: Duplicate column(s) exception.

For example, say we have two schema in the following structure:

val schema1 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema2 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) :: Nil
    ), true) :: Nil)

val schema3 = StructType(StructField("A", StructType(
    StructField("i", StringType, true) ::
    StructField("ii", StringType, true) :: Nil
    ), true) :: Nil)

val final_schema = (schema1 ++ schema2 ++ schema3).distinct

println(final_schema)

which outputs:

StructType(
    StructField(A,StructType(
         StructField(i,StringType,true)),true), 
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

I understand that only schema structure that exactly match another schema will get filtered out by distinct. However I want the result to look like this:

StructType(
    StructField(A,StructType(
        StructField(i,StringType,true),    
        StructField(ii,StringType,true)),true))

in which all the gets "combined" into one schema. I have sifted through all the methods in scala documentation but I cannot seem to find the right method to solve this. Any ideas?

EDIT:

The end goal will be to feed in final_schema to sqlContext.read.schema and read RDD of JSON strings using read method.

like image 712
THIS USER NEEDS HELP Avatar asked Mar 13 '26 19:03

THIS USER NEEDS HELP


1 Answers

Spark with Scala:

val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)

Spark with Java:

StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());
like image 80
Rituparno Behera Avatar answered Mar 15 '26 18:03

Rituparno Behera