Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

get size of parquet file in HDFS for repartition with Spark in Scala

I have many parquet file directories on HDFS that contain a few thousands of small(most < 100kb) parquet files each. They slow down my Spark job, so I want to combine them.

With the following code I can repartition the local parquet file to smaller number of parts:

val pqFile = sqlContext.read.parquet("file:/home/hadoop/data/file.parquet")
pqFile.coalesce(4).write.save("file:/home/hadoop/data/fileSmaller.parquet")

But I don't know how to get the size of a directory on HDFS through Scala code programmatically, hence I can't work out the number of partitions to pass to the coalesce function for the real data set.

How can I do this? Or is there a convenient way within Spark so that I can configure the writer to write fixed size of parquet partition?

like image 856
Bamqf Avatar asked Nov 29 '15 22:11

Bamqf


People also ask

How do I read a Parquet file from HDFS spark?

Write & Read Parquet file from HDFS parquet) to read the parquet files and creates a Spark DataFrame. Using spark. write. parquet() function we can write Spark DataFrame to Parquet file, and parquet() function is provided in DataFrameWriter class.

What is the optimal size of Parquet file?

The official Parquet documentation recommends a disk block/row group/file size of 512 to 1024 MB on HDFS.

What is Parquet block size?

parquet. block-size parameter is 268435456 (256 MB), the same size as file system chunk sizes. In previous versions of Drill, the default value was 536870912 (512 MB).


1 Answers

You could try

pqFile.inputFiles.size

which returns "a best-effort snapshot of the files that compose this DataFrame" according to the documentation.

As an alternative, directly on the HDFS level:

val hdfs: org.apache.hadoop.fs.FileSystem =
  org.apache.hadoop.fs.FileSystem.get(
    new org.apache.hadoop.conf.Configuration())

val hadoopPath= new org.apache.hadoop.fs.Path("hdfs://localhost:9000/tmp")
val recursive = false
val ri = hdfs.listFiles(hadoopPath, recursive)
val it = new Iterator[org.apache.hadoop.fs.LocatedFileStatus]() {
  override def hasNext = ri.hasNext
  override def next() = ri.next()
}

// Materialize iterator
val files = it.toList
println(files.size)
println(files.map(_.getLen).sum)

This way you get the file sizes as well.

like image 62
Beryllium Avatar answered Sep 22 '22 00:09

Beryllium