Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Reading many small parquet files gets status of each file before hand

I have hundreds of thousands of smaller parquet files I'm attempting to read in with Spark on a regular basis. My application runs, but before the files are read in using the executor nodes, the driver node appears to be getting the status of each individual file. I read into it a bit and this is necessary to infer the schema and partitions. I tried providing them as so:

sparkSession.baseRelationToDataFrame(
  DataSource
    .apply(
      sparkSession,
      paths = paths, // List of thousands of parquet files in S3
      partitionColumns = Seq("my_join_column"),
      userSpecifiedSchema = Some(schema),
      className = "parquet",
      options = Seq().toMap
    )
    .resolveRelation(checkFilesExist = false)
)

But even when providing the schema and partition columns, it takes a while before hand. After looking into the resolveRelation code a bit, it looks like it still has to query the status of each file in order to build an InMemoryFileIndex.

Is there any way to get around this issue?

I'm using spark-sql 2.3.1.

like image 545
Sam Avatar asked Nov 02 '18 17:11

Sam


People also ask

Is it better to have one large Parquet file or lots of smaller Parquet files?

Optimal file size for HDFSAvoid file sizes that are smaller than the configured block size. An average size below the recommended size adds more burden to the NameNode, cause heap/GC issues in addition to cause storage and processing to be inefficient. Larger files than the blocksize are potentially wasteful.

Does Spark work well with small files?

The main concern with small files on HDFS is that billions of small files can cause problems. Spark may be an in-memory processing framework, but it still works if the data doesn't fit into memory. In such situations processing spills over onto disk and will be a bit slower.


1 Answers

There is no good way to avoid this problem in the current Spark architecture.

A while back I collaborated with some Spark committers on a LazyBaseRelation design that can delay discovering file information until the number of partitions--as opposed to just the schema--of a data source must be known, which isn't technically necessary until an action has to be run, but we never completed the work. Even then, when the time comes to execute an action, you'd take the hit.

There are four practical approaches to speeding the initial file discovery:

  1. Use a large cluster, as some aspects of file discovery are distributed. In some environments you can scale a cluster down once discovery is complete.
  2. Do the initial discovery before you need to use the data, in order to, hopefully, have it available by the time you need it. We have petabytes of data in millions of large Parquet files with three levels of partitioning. We use a scheduled job to refresh the in-memory file index.
  3. If you are on Databricks, use Delta's OPTIMIZE to coalesce the small Parquet files into fewer, larger ones. Note that Delta costs extra.
  4. Implement the equivalent of OPTIMIZE by yourself, rewriting subsets of the data. Whether you can do this easily or not depends on access patterns: you have to think about idempotence and consistency.

Once initial discovery is done, caching the in-memory file list is your best friend. There are two ways of doing it:

  • Use the metastore, by registering your data as an external table. Whether you can do this easily or not depends on data update patterns. If the data is naturally partitioned you can add partitions using DDL and you can easily implement strategy (4) above.

  • Build your own table manager. This is what we did as the metastore implementation had unacceptable restrictions on schema evolution. You'd have to decide on scoping: driver/JVM-and SparkSession are the two obvious choices.

Good luck!

like image 86
Sim Avatar answered Oct 13 '22 09:10

Sim