I have hundreds of thousands of smaller parquet files I'm attempting to read in with Spark on a regular basis. My application runs, but before the files are read in using the executor nodes, the driver node appears to be getting the status of each individual file. I read into it a bit and this is necessary to infer the schema and partitions. I tried providing them as so:
sparkSession.baseRelationToDataFrame(
DataSource
.apply(
sparkSession,
paths = paths, // List of thousands of parquet files in S3
partitionColumns = Seq("my_join_column"),
userSpecifiedSchema = Some(schema),
className = "parquet",
options = Seq().toMap
)
.resolveRelation(checkFilesExist = false)
)
But even when providing the schema and partition columns, it takes a while before hand. After looking into the resolveRelation
code a bit, it looks like it still has to query the status of each file in order to build an InMemoryFileIndex
.
Is there any way to get around this issue?
I'm using spark-sql 2.3.1
.
Optimal file size for HDFSAvoid file sizes that are smaller than the configured block size. An average size below the recommended size adds more burden to the NameNode, cause heap/GC issues in addition to cause storage and processing to be inefficient. Larger files than the blocksize are potentially wasteful.
The main concern with small files on HDFS is that billions of small files can cause problems. Spark may be an in-memory processing framework, but it still works if the data doesn't fit into memory. In such situations processing spills over onto disk and will be a bit slower.
There is no good way to avoid this problem in the current Spark architecture.
A while back I collaborated with some Spark committers on a LazyBaseRelation
design that can delay discovering file information until the number of partitions--as opposed to just the schema--of a data source must be known, which isn't technically necessary until an action has to be run, but we never completed the work. Even then, when the time comes to execute an action, you'd take the hit.
There are four practical approaches to speeding the initial file discovery:
OPTIMIZE
to coalesce the small Parquet files into fewer, larger ones. Note that Delta costs extra.OPTIMIZE
by yourself, rewriting subsets of the data. Whether you can do this easily or not depends on access patterns: you have to think about idempotence and consistency.Once initial discovery is done, caching the in-memory file list is your best friend. There are two ways of doing it:
Use the metastore, by registering your data as an external table. Whether you can do this easily or not depends on data update patterns. If the data is naturally partitioned you can add partitions using DDL and you can easily implement strategy (4) above.
Build your own table manager. This is what we did as the metastore implementation had unacceptable restrictions on schema evolution. You'd have to decide on scoping: driver/JVM-and SparkSession
are the two obvious choices.
Good luck!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With