Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing multiple files as independent RDD's in parallel

I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..

df.groupBy(....).agg(....)

Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.

What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.

like image 363
Love Hasija Avatar asked Aug 10 '15 06:08

Love Hasija


2 Answers

It is more an idea than a full solution and I haven't tested it yet.

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.

val n: Int = ??? 

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

Next you can map above list using pipeline function:

val rdds = files.map(f => pipeline(f, n))

Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

Finally we can use above helper on the rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.

like image 62
zero323 Avatar answered Nov 15 '22 07:11

zero323


If you have many files, and each file is small (you say 300MB above which I would count as small for Spark), you could try using SparkContext.wholeTextFiles which will create an RDD where each record is an entire file.

like image 24
mattinbits Avatar answered Nov 15 '22 08:11

mattinbits