Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Issue while parsing mongo collection which has few schemas in spark

I'm moving data from one collection to another in other cluster using Spark. the data's schema is not consistent(I mean that has few schema's in a single collection with different data types with little variations). When I try to read data from spark, the sampling is unable to get all the schema's of the data and throwing the below error.(I have a complex schema which I can't explicitly mention instead of spark gets by sampling.)

com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast ARRAY into a NullType (value: BsonArray{values=[{ "type" : "GUEST_FEE", "appliesPer" : "GUEST_PER_NIGHT", "description" : null, "minAmount" : 33, "maxAmount" : 33 }]})

I tried reading the collection as an RDD and write as an RDD still the issue persists.

Any help on this.!

Thanks.

like image 306
knowledge_seeker Avatar asked Jun 20 '18 18:06

knowledge_seeker


People also ask

Does Spark support MongoDB?

The MongoDB Connector for Spark provides integration between MongoDB and Apache Spark. Version 10. x of the MongoDB Connector for Spark is an all-new connector based on the latest Spark API. Install and migrate to version 10.

What is Pyspark schema?

What is Spark Schema. Spark schema is the structure of the DataFrame or Dataset, we can define it using StructType class which is a collection of StructField that define the column name(String), column type (DataType), nullable column (Boolean) and metadata (MetaData)

What is Spark and MongoDB?

Apache Spark is a powerful processing engine designed for speed, ease of use, and sophisticated analytics. Spark particularly excels when fast performance is required. MongoDB is a popular NoSQL database that enterprises rely on for real-time analytics from their operational data.


1 Answers

All these com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast SOME_TYPE into a NullType come from incorrect schema inference. For schema-less data sources such as JSON file or mongodb, Spark does a scan of small fraction of the data to determine the types. If some particular field has lots of NULL's you can get unlucky and type will be set as NullType.

One thing you can do is increase the number of entries scanned for schema inference.

Another - get the inferred schema first, fix it, and reload dataframe with fixed schema:

def fix_spark_schema(schema):
  if schema.__class__ == pyspark.sql.types.StructType:
    return pyspark.sql.types.StructType([fix_spark_schema(f) for f in schema.fields])
  if schema.__class__ == pyspark.sql.types.StructField:
    return pyspark.sql.types.StructField(schema.name, fix_spark_schema(schema.dataType), schema.nullable)
  if schema.__class__ == pyspark.sql.types.NullType:
    return pyspark.sql.types.StringType()
  return schema

collection_schema = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load() \
    .schema

collection = sqlContext.read \
    .format("com.mongodb.spark.sql") \
    .options(...) \
    .load(schema=fix_spark_schema(collection_schema))

In my case all problematic fields could be represented with StringType, you might make the logic more complex if needed.

like image 132
vlyubin Avatar answered Nov 02 '22 06:11

vlyubin