Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark read parquet with custom schema

I'm trying to import data with parquet format with custom schema but it returns : TypeError: option() missing 1 required positional argument: 'value'

   ProductCustomSchema = StructType([
        StructField("id_sku", IntegerType(), True),
        StructField("flag_piece", StringType(), True),
        StructField("flag_weight", StringType(), True),
        StructField("ds_sku", StringType(), True),
        StructField("qty_pack", FloatType(), True)])

def read_parquet_(path, schema) : 
    return spark.read.format("parquet")\
                             .option(schema)\
                             .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
                             .load(path)

product_nomenclature = 'C:/Users/alexa/Downloads/product_nomenc'
product_nom = read_parquet_(product_nomenclature, ProductCustomSchema)
like image 352
user9176398 Avatar asked Sep 18 '18 12:09

user9176398


People also ask

How do I read the schema of a Parquet file in Pyspark?

Pyspark SQL provides methods to read Parquet file into DataFrame and write DataFrame to Parquet files, parquet() function from DataFrameReader and DataFrameWriter are used to read from and write/create a Parquet file respectively.

Can we change the schema of Parquet file?

It's all immutable. The problem we have when we need to edit the data is that our data structures are immutable. You can add partitions to Parquet files, but you can't edit the data in place.

Do Parquet files contain schema?

Self-describing: In addition to data, a Parquet file contains metadata including schema and structure. Each file stores both the data and the standards used for accessing each record – making it easier to decouple services that write, store, and read Parquet files.


1 Answers

As mentioned in the comments you should change .option(schema) to .schema(schema). option() requires you to specify a key (the name of the option you're setting) and a value (what value you want to assign to that option). You are getting the TypeError because you were just passing a variable called schema to option without specifying what that option you were actually trying to set with that variable.

The QueryExecutionException you posted in the comments is being raised because the schema you've defined in your schema variable does not match the data in your DataFrame. If you're going to specify a custom schema you must make sure that schema matches the data you are reading. In your example the column id_sku is stored as a BinaryType, but in your schema you're defining the column as an IntegerType. pyspark will not try to reconcile differences between the schema you provide and what the actual types are in the data and an exception will be thrown.

To fix your error make sure the schema you're defining correctly represents your data as it is stored in the parquet file (i.e. change the datatype of id_sku in your schema to be BinaryType). The benefit to doing this is you get a slight performance gain by not having to infer the file schema each time the parquet file is read.

like image 103
vielkind Avatar answered Oct 26 '22 07:10

vielkind