Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read all Parquet files saved in a folder via Spark

I have a folder containing Parquet files. Something like this:

scala> val df = sc.parallelize(List(1,2,3,4)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.write.parquet("/tmp/test/df/1.parquet")

scala> val df = sc.parallelize(List(5,6,7,8)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.write.parquet("/tmp/test/df/2.parquet")

After saving dataframes when I go to read all parquet files in df folder, it gives me error.

scala> val read = spark.read.parquet("/tmp/test/df")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
  ... 48 elided

I know I can read Parquet files by giving full path, but it would be better if there is a way to read all parquet files in a folder.

like image 539
himanshuIIITian Avatar asked Mar 27 '17 06:03

himanshuIIITian


People also ask

How do I read a folder in Spark?

read(). text("file_name") to read a file or directory of text files into a Spark DataFrame, and dataframe. write(). text("path") to write to a text file.


Video Answer


2 Answers

Spark doesn't write/read parquet the way you think it does.

It uses the Hadoop library to write/read partitioned parquet file.

Thus your first parquet file is under the path /tmp/test/df/1.parquet/ where 1.parquet is a directory. This means that when reading from parquet you would need to provide the path to your parquet directory or path if it's one file.

val df = spark.read.parquet("/tmp/test/df/1.parquet/")

I advice you to read the official documentation for more details. [cf. SQL Programming Guide - Parquet Files]

EDIT:

You must be looking for something like this :

scala> sqlContext.range(1,100).write.save("/tmp/test/df/1.parquet")

scala> sqlContext.range(100,500).write.save("/tmp/test/df/2.parquet")

scala> val df = sqlContext.read.load("/tmp/test/df/*")
// df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show(3)
// +---+
// | id|
// +---+
// |400|
// |401|
// |402|
// +---+
// only showing top 3 rows

scala> df.count
// res3: Long = 499

You can also use wildcards in your file paths URI.

And you can provide multiple files paths as followed :

scala> val df2 = sqlContext.read.load("/tmp/test/df/1.parquet","/tmp/test/df/2.parquet")
// df2: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df2.count
// res5: Long = 499
like image 60
eliasah Avatar answered Sep 29 '22 18:09

eliasah


You can write data into folder not as separate Spark "files" (in fact folders) 1.parquet, 2.parquet etc. If don't set file name but only path, Spark will put files into the folder as real files (not folders), and automatically name that files.

df1.write.partitionBy("countryCode").format("parquet").mode("overwrite").save("/tmp/data1/")
df2.write.partitionBy("countryCode").format("parquet").mode("append").save("/tmp/data1/")
df3.write.partitionBy("countryCode").format("parquet").mode("append").save("/tmp/data1/")

Further we can read data from all files in data folder:

val df = spark.read.format("parquet").load("/tmp/data1/")
like image 27
Ihor Konovalenko Avatar answered Sep 29 '22 18:09

Ihor Konovalenko