Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling empty arrays in pySpark (optional binary element (UTF8) is not a group)

I have a json-like structure in spark which looks as follows:

>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

depending on the partition, some-array might be an empty array for all id's. When this happend spark infers the following schema:

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string

Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem

>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string

So far so good, but when I try to actually collect the data I get an error:

>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group

I don't understand why this fails. There should be no incompatibility cause by the schema. In case you wonder, collecting the data without specifying the schema works.

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string   # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works

Edit

Here there's a reproducible example in python

from pyspark.sql.types import *


myschema = StructType([
   StructField('id', StringType())
 , StructField( 'some-array'
              , ArrayType(StructType([
                  StructField('array-field-1', StringType())
                , StructField('array-field-2', StringType())
                ])
              ))
  ])

path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema

read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!

like image 606
lsmor Avatar asked Feb 19 '20 09:02

lsmor


2 Answers

The solution I've found is setting the option dropFieldIfAllNull to True when reading the json file. This causes field with empty array to disapear, making merging schema easier.

>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string

Now, undesired type inference won't apply and when reading multiple partitions of the same file the option mergeSchema will be able to read all files without colision.

like image 91
lsmor Avatar answered Nov 10 '22 16:11

lsmor


Of course that's a problem if I want to read multiple partitions because spark cannot merge schemas. I have tried to define manually the schema so there should be no problem

I am afraid that there is no such a schema which can parse simultaneously these two cases. The data {"id": "OK", "some-array": [{"array-field-1":"f1", "array-field-2":"f2"}]} can be parsed only with:

good_schema = StructType([
  StructField('id', StringType()), 
  StructField( 'some-array', 
              ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())
              ])
          ))
  ])

when {"id": "OK", "some-array": []} with:

bad_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', ArrayType(StringType()))
])

Therefore one option is to read these two directories with different schemas.

I don't understand why this fails. There should be no incompatibility cause by the schema.

As explained above the data is incompatible with the schema.

In case you wonder, collecting the data without specifying the schema works.

This is the expected behaviour since when there is no explicit schema specified Spark will try to discover it.

Suggested solution

The only solution that I can think of is to treat some-array field as a string. I don't know if this feasible in your system although you could implement it by casting the some-array into string for both schemas/partitions.

This conversion can be done with at least two options given:

good_data_df = spark.read.schema(good_schema).parquet(...)
bad_data_df = spark.read.schema(bad_schema).parquet(...)
  1. Read both datasets and convert the some-array field to string then save the results under one common directory with:
good_data_df = good_data_df.withColumn("some-array", col("some-array").cast("string"))
bad_data_df = bad_data_df.withColumn("some-array", col("some-array").cast("string"))

good_data_df.union(bad_data_df).write.mode("overwrite").parquet("parquet_path")
  1. Execute the above conversion in runtime skipping the re-write step.

Finally you can load some-array as a string and then convert it to array_schema using from_json function:

from pyspark.sql.types import *
from pyspark.sql.functions import from_json

array_schema = ArrayType(StructType([
                StructField('array-field-1', StringType()),
                StructField('array-field-2', StringType())]))

# we will use this for both partitions
generic_schema = StructType([
  StructField('id', StringType()), 
  StructField('some-array', StringType())
])

parquet_path = "/tmp/60297547/parquet"
good_data = "{'id': 'OK', 'some-array': \"[{'array-field-1':'f1a','array-field-2':'f2a'},{'array-field-1':'f1b','array-field-2':'f2b'}]\"}"
bad_data = "{'id': 'OK', 'some-array': '[]'}"

# putting bad and good partitions into the same dataset where some-array is string
rdd = sc.parallelize([bad_data, good_data])
df = spark.read.json(rdd)
df.write.mode("overwrite").parquet(parquet_path)

final_df = spark.read.schema(generic_schema).parquet(parquet_path)
final_df = final_df.withColumn("some-array", from_json(final_df["some-array"], array_schema))

final_df.show(10, False)

# +---+------------------------+
# |id |some-array              |
# +---+------------------------+
# |OK |[[f1a, f2a], [f1b, f2b]]|
# |OK |[]                      |
# +---+------------------------+

final_df.printSchema()
# root
#  |-- id: string (nullable = true)
#  |-- some-array: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- array-field-1: string (nullable = true)
#  |    |    |-- array-field-2: string (nullable = true)
like image 38
abiratsis Avatar answered Nov 10 '22 15:11

abiratsis