I have a json-like structure in spark which looks as follows:
>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
depending on the partition, some-array
might be an empty array for all id
's. When this happend spark infers the following schema:
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string
Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem
>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
So far so good, but when I try to actually collect the data I get an error:
>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
I don't understand why this fails. There should be no incompatibility cause by the schema. In case you wonder, collecting the data without specifying the schema works.
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works
Here there's a reproducible example in python
from pyspark.sql.types import *
myschema = StructType([
StructField('id', StringType())
, StructField( 'some-array'
, ArrayType(StructType([
StructField('array-field-1', StringType())
, StructField('array-field-2', StringType())
])
))
])
path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema
read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!
The solution I've found is setting the option dropFieldIfAllNull
to True
when reading the json file. This causes field with empty array to disapear, making merging schema easier.
>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string
Now, undesired type inference won't apply and when reading multiple partitions of the same file the option mergeSchema
will be able to read all files without colision.
Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem
I am afraid that there is no such a schema which can parse simultaneously these two cases. The data {"id": "OK", "some-array": [{"array-field-1":"f1", "array-field-2":"f2"}]}
can be parsed only with:
good_schema = StructType([
StructField('id', StringType()),
StructField( 'some-array',
ArrayType(StructType([
StructField('array-field-1', StringType()),
StructField('array-field-2', StringType())
])
))
])
when {"id": "OK", "some-array": []}
with:
bad_schema = StructType([
StructField('id', StringType()),
StructField('some-array', ArrayType(StringType()))
])
Therefore one option is to read these two directories with different schemas.
I don't understand why this fails. There should be no incompatibility cause by the schema.
As explained above the data is incompatible with the schema.
In case you wonder, collecting the data without specifying the schema works.
This is the expected behaviour since when there is no explicit schema specified Spark will try to discover it.
Suggested solution
The only solution that I can think of is to treat some-array
field as a string. I don't know if this feasible in your system although you could implement it by cast
ing the some-array
into string for both schemas/partitions.
This conversion can be done with at least two options given:
good_data_df = spark.read.schema(good_schema).parquet(...)
bad_data_df = spark.read.schema(bad_schema).parquet(...)
some-array
field to string then save the results under one common directory with:good_data_df = good_data_df.withColumn("some-array", col("some-array").cast("string"))
bad_data_df = bad_data_df.withColumn("some-array", col("some-array").cast("string"))
good_data_df.union(bad_data_df).write.mode("overwrite").parquet("parquet_path")
Finally you can load some-array
as a string and then convert it to array_schema
using from_json
function:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
array_schema = ArrayType(StructType([
StructField('array-field-1', StringType()),
StructField('array-field-2', StringType())]))
# we will use this for both partitions
generic_schema = StructType([
StructField('id', StringType()),
StructField('some-array', StringType())
])
parquet_path = "/tmp/60297547/parquet"
good_data = "{'id': 'OK', 'some-array': \"[{'array-field-1':'f1a','array-field-2':'f2a'},{'array-field-1':'f1b','array-field-2':'f2b'}]\"}"
bad_data = "{'id': 'OK', 'some-array': '[]'}"
# putting bad and good partitions into the same dataset where some-array is string
rdd = sc.parallelize([bad_data, good_data])
df = spark.read.json(rdd)
df.write.mode("overwrite").parquet(parquet_path)
final_df = spark.read.schema(generic_schema).parquet(parquet_path)
final_df = final_df.withColumn("some-array", from_json(final_df["some-array"], array_schema))
final_df.show(10, False)
# +---+------------------------+
# |id |some-array |
# +---+------------------------+
# |OK |[[f1a, f2a], [f1b, f2b]]|
# |OK |[] |
# +---+------------------------+
final_df.printSchema()
# root
# |-- id: string (nullable = true)
# |-- some-array: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- array-field-1: string (nullable = true)
# | | |-- array-field-2: string (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With