I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..
df.groupBy(....).agg(....)
Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.
What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.
It is more an idea than a full solution and I haven't tested it yet.
You can start with extracting your data processing pipeline into a function.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
If your files are small you can adjust n
parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.
val n: Int = ???
Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:
val files: Array[String] = ???
Next you can map above list using pipeline
function:
val rdds = files.map(f => pipeline(f, n))
Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
Finally we can use above helper on the rdds
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
Depending on your requirements you can add onComplete
callbacks or use reactive streams to collect the results.
If you have many files, and each file is small (you say 300MB above which I would count as small for Spark), you could try using SparkContext.wholeTextFiles
which will create an RDD where each record is an entire file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With