Is there any Spark function that allows to split a collection into several RDDs according to some creteria? Such function would allow to avoid excessive itteration. For example:
def main(args: Array[String]) {
val logFile = "file.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
}
In this example I have to iterate 'logData` twice just to write results in two separate files:
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
It would be nice instead to have something like this:
val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
resultMap.writeByKey("a", "linesA.txt")
resultMap.writeByKey("b", "linesB.txt")
Any such thing?
This Apache Spark RDD Tutorial will help you start understanding and using Spark RDD (Resilient Distributed Dataset) with Scala. All RDD examples provided in this Tutorial were tested in our development environment and are available at GitHub spark scala examples project for quick reference.
And, the scala example I am using in this tutorial is available at GitHub project flatMap – flatMap () transformation flattens the RDD after applying the function and returns a new RDD. In the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.
Before we start with an example of Spark split function, first let’s create a DataFrame and will use one of the column from this DataFrame to split into multiple columns From the above DataFrame, column name of type String is a combined field of the first name, middle & lastname separated by comma delimiter.
When creating a pair RDD from an in-memory collection in Scala and java, we only need to call SparkContext.parallelize () on a collection of pairs in scala and SparkContext.parallelizePairs () in java. Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements.
Maybe something like this would work:
def singlePassMultiFilter[T](
rdd: RDD[T],
f1: T => Boolean,
f2: T => Boolean,
level: StorageLevel = StorageLevel.MEMORY_ONLY
): (RDD[T], RDD[T], Boolean => Unit) = {
val tempRDD = rdd mapPartitions { iter =>
val abuf1 = ArrayBuffer.empty[T]
val abuf2 = ArrayBuffer.empty[T]
for (x <- iter) {
if (f1(x)) abuf1 += x
if (f2(x)) abuf2 += x
}
Iterator.single((abuf1, abuf2))
}
tempRDD.persist(level)
val rdd1 = tempRDD.flatMap(_._1)
val rdd2 = tempRDD.flatMap(_._2)
(rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking))
}
Note that an action called on rdd1
(resp. rdd2
) will cause tempRDD to be computed and persisted. This is practically equivalent to computing rdd2
(resp. rdd1
) since the overhead of the flatMap
in the definitions of rdd1
and rdd2
are, I believe, going to be pretty negligible.
You would use singlePassMultiFitler
like so:
val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2)
rdd1.persist() //I'm going to need `rdd1` more later...
println(rdd1.count)
println(rdd2.count)
cleanUp(true) //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up...
println(rdd1.distinct.count)
Clearly this could extended to an arbitrary number of filters, collections of filters, etc.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With