I'm trying to import data with parquet format with custom schema but it returns : TypeError: option() missing 1 required positional argument: 'value'
ProductCustomSchema = StructType([
StructField("id_sku", IntegerType(), True),
StructField("flag_piece", StringType(), True),
StructField("flag_weight", StringType(), True),
StructField("ds_sku", StringType(), True),
StructField("qty_pack", FloatType(), True)])
def read_parquet_(path, schema) :
return spark.read.format("parquet")\
.option(schema)\
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
.load(path)
product_nomenclature = 'C:/Users/alexa/Downloads/product_nomenc'
product_nom = read_parquet_(product_nomenclature, ProductCustomSchema)
Pyspark SQL provides methods to read Parquet file into DataFrame and write DataFrame to Parquet files, parquet() function from DataFrameReader and DataFrameWriter are used to read from and write/create a Parquet file respectively.
It's all immutable. The problem we have when we need to edit the data is that our data structures are immutable. You can add partitions to Parquet files, but you can't edit the data in place.
Self-describing: In addition to data, a Parquet file contains metadata including schema and structure. Each file stores both the data and the standards used for accessing each record – making it easier to decouple services that write, store, and read Parquet files.
As mentioned in the comments you should change .option(schema)
to .schema(schema)
. option()
requires you to specify a key
(the name of the option you're setting) and a value
(what value you want to assign to that option). You are getting the TypeError
because you were just passing a variable called schema
to option
without specifying what that option you were actually trying to set with that variable.
The QueryExecutionException
you posted in the comments is being raised because the schema you've defined in your schema
variable does not match the data in your DataFrame. If you're going to specify a custom schema you must make sure that schema matches the data you are reading. In your example the column id_sku
is stored as a BinaryType
, but in your schema you're defining the column as an IntegerType
. pyspark
will not try to reconcile differences between the schema you provide and what the actual types are in the data and an exception will be thrown.
To fix your error make sure the schema you're defining correctly represents your data as it is stored in the parquet file (i.e. change the datatype of id_sku
in your schema to be BinaryType
). The benefit to doing this is you get a slight performance gain by not having to infer the file schema each time the parquet file is read.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With