I have a json-like structure in spark which looks as follows:
>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string
depending on the partition, some-array might be an empty array for all id's. When this happend spark infers the following schema:
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string
Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem
>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string
So far so good, but when I try to actually collect the data I get an error:
>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
I don't understand why this fails. There should be no incompatibility cause by the schema. In case you wonder, collecting the data without specifying the schema works.
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string   # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works
Here there's a reproducible example in python
from pyspark.sql.types import *
myschema = StructType([
   StructField('id', StringType())
 , StructField( 'some-array'
              , ArrayType(StructType([
                  StructField('array-field-1', StringType())
                , StructField('array-field-2', StringType())
                ])
              ))
  ])
path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema
read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!
                The solution I've found is setting the option dropFieldIfAllNull to True when reading the json file. This causes field with empty array to disapear, making merging schema easier. 
>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string
Now, undesired type inference won't apply and when reading multiple partitions of the same file the option mergeSchema will be able to read all files without colision.
Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem
I am afraid that there is no such a schema which can parse simultaneously these two cases. The data {"id": "OK", "some-array": [{"array-field-1":"f1", "array-field-2":"f2"}]} can be parsed only with:
good_schema = StructType([
  StructField('id', StringType()), 
  StructField( 'some-array', 
              ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())
              ])
          ))
  ])
when {"id": "OK", "some-array": []} with:
bad_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', ArrayType(StringType()))
])
Therefore one option is to read these two directories with different schemas.
I don't understand why this fails. There should be no incompatibility cause by the schema.
As explained above the data is incompatible with the schema.
In case you wonder, collecting the data without specifying the schema works.
This is the expected behaviour since when there is no explicit schema specified Spark will try to discover it.
Suggested solution
The only solution that I can think of is to treat some-array field as a string. I don't know if this feasible in your system although you could implement it by casting the some-array into string for both schemas/partitions.
This conversion can be done with at least two options given:
good_data_df = spark.read.schema(good_schema).parquet(...)
bad_data_df = spark.read.schema(bad_schema).parquet(...)
some-array field to string then save the results under one common directory with:good_data_df = good_data_df.withColumn("some-array", col("some-array").cast("string"))
bad_data_df = bad_data_df.withColumn("some-array", col("some-array").cast("string"))
good_data_df.union(bad_data_df).write.mode("overwrite").parquet("parquet_path")
Finally you can load some-array as a string and then convert it to array_schema using from_json function:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
array_schema = ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())]))
# we will use this for both partitions
generic_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', StringType())
])
parquet_path = "/tmp/60297547/parquet"
good_data = "{'id': 'OK', 'some-array': \"[{'array-field-1':'f1a','array-field-2':'f2a'},{'array-field-1':'f1b','array-field-2':'f2b'}]\"}"
bad_data = "{'id': 'OK', 'some-array': '[]'}"
# putting bad and good partitions into the same dataset where some-array is string
rdd = sc.parallelize([bad_data, good_data])
df = spark.read.json(rdd)
df.write.mode("overwrite").parquet(parquet_path)
final_df = spark.read.schema(generic_schema).parquet(parquet_path)
final_df = final_df.withColumn("some-array", from_json(final_df["some-array"], array_schema))
final_df.show(10, False)
# +---+------------------------+
# |id |some-array              |
# +---+------------------------+
# |OK |[[f1a, f2a], [f1b, f2b]]|
# |OK |[]                      |
# +---+------------------------+
final_df.printSchema()
# root
#  |-- id: string (nullable = true)
#  |-- some-array: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- array-field-1: string (nullable = true)
#  |    |    |-- array-field-2: string (nullable = true)
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With