Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala Spark: Split collection into several RDD?

Is there any Spark function that allows to split a collection into several RDDs according to some creteria? Such function would allow to avoid excessive itteration. For example:

def main(args: Array[String]) {
    val logFile = "file.txt" 
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
  }

In this example I have to iterate 'logData` twice just to write results in two separate files:

    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")

It would be nice instead to have something like this:

    val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
    resultMap.writeByKey("a", "linesA.txt") 
    resultMap.writeByKey("b", "linesB.txt")

Any such thing?

like image 615
zork Avatar asked Dec 01 '14 15:12

zork


People also ask

What is spark RDD in Scala?

This Apache Spark RDD Tutorial will help you start understanding and using Spark RDD (Resilient Distributed Dataset) with Scala. All RDD examples provided in this Tutorial were tested in our development environment and are available at GitHub spark scala examples project for quick reference.

How to flatten RDD in Scala using flatMap?

And, the scala example I am using in this tutorial is available at GitHub project flatMap – flatMap () transformation flattens the RDD after applying the function and returns a new RDD. In the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.

How to split a Dataframe into multiple columns in spark?

Before we start with an example of Spark split function, first let’s create a DataFrame and will use one of the column from this DataFrame to split into multiple columns From the above DataFrame, column name of type String is a combined field of the first name, middle & lastname separated by comma delimiter.

How do I parallelize RDDs in Scala and Java?

When creating a pair RDD from an in-memory collection in Scala and java, we only need to call SparkContext.parallelize () on a collection of pairs in scala and SparkContext.parallelizePairs () in java. Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements.


1 Answers

Maybe something like this would work:

def singlePassMultiFilter[T](
      rdd: RDD[T],
      f1: T => Boolean,
      f2: T => Boolean,
      level: StorageLevel = StorageLevel.MEMORY_ONLY
  ): (RDD[T], RDD[T], Boolean => Unit) = {
  val tempRDD = rdd mapPartitions { iter =>
    val abuf1 = ArrayBuffer.empty[T]
    val abuf2 = ArrayBuffer.empty[T]
    for (x <- iter) {
      if (f1(x)) abuf1 += x
      if (f2(x)) abuf2 += x
    }
    Iterator.single((abuf1, abuf2))
  }
  tempRDD.persist(level)
  val rdd1 = tempRDD.flatMap(_._1)
  val rdd2 = tempRDD.flatMap(_._2)
  (rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking))
}

Note that an action called on rdd1 (resp. rdd2) will cause tempRDD to be computed and persisted. This is practically equivalent to computing rdd2 (resp. rdd1) since the overhead of the flatMap in the definitions of rdd1 and rdd2 are, I believe, going to be pretty negligible.

You would use singlePassMultiFitler like so:

val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2)
rdd1.persist()    //I'm going to need `rdd1` more later...
println(rdd1.count)  
println(rdd2.count) 
cleanUp(true)     //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up...
println(rdd1.distinct.count)

Clearly this could extended to an arbitrary number of filters, collections of filters, etc.

like image 61
Jason Lenderman Avatar answered Oct 02 '22 20:10

Jason Lenderman