I have a folder containing Parquet files. Something like this:
scala> val df = sc.parallelize(List(1,2,3,4)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.write.parquet("/tmp/test/df/1.parquet")
scala> val df = sc.parallelize(List(5,6,7,8)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.write.parquet("/tmp/test/df/2.parquet")
After saving dataframes when I go to read all parquet files in df
folder, it gives me error.
scala> val read = spark.read.parquet("/tmp/test/df")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
... 48 elided
I know I can read Parquet files by giving full path, but it would be better if there is a way to read all parquet files in a folder.
read(). text("file_name") to read a file or directory of text files into a Spark DataFrame, and dataframe. write(). text("path") to write to a text file.
Spark doesn't write/read parquet the way you think it does.
It uses the Hadoop library to write/read partitioned parquet file.
Thus your first parquet file is under the path /tmp/test/df/1.parquet/
where 1.parquet
is a directory. This means that when reading from parquet you would need to provide the path to your parquet directory or path if it's one file.
val df = spark.read.parquet("/tmp/test/df/1.parquet/")
I advice you to read the official documentation for more details. [cf. SQL Programming Guide - Parquet Files]
EDIT:
You must be looking for something like this :
scala> sqlContext.range(1,100).write.save("/tmp/test/df/1.parquet")
scala> sqlContext.range(100,500).write.save("/tmp/test/df/2.parquet")
scala> val df = sqlContext.read.load("/tmp/test/df/*")
// df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show(3)
// +---+
// | id|
// +---+
// |400|
// |401|
// |402|
// +---+
// only showing top 3 rows
scala> df.count
// res3: Long = 499
You can also use wildcards in your file paths URI.
And you can provide multiple files paths as followed :
scala> val df2 = sqlContext.read.load("/tmp/test/df/1.parquet","/tmp/test/df/2.parquet")
// df2: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df2.count
// res5: Long = 499
You can write data into folder not as separate Spark "files" (in fact folders) 1.parquet
, 2.parquet
etc.
If don't set file name but only path, Spark will put files into the folder as real files (not folders), and automatically name that files.
df1.write.partitionBy("countryCode").format("parquet").mode("overwrite").save("/tmp/data1/")
df2.write.partitionBy("countryCode").format("parquet").mode("append").save("/tmp/data1/")
df3.write.partitionBy("countryCode").format("parquet").mode("append").save("/tmp/data1/")
Further we can read data from all files in data folder:
val df = spark.read.format("parquet").load("/tmp/data1/")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With