Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark processing columns in parallel

I've been playing with Spark, and I managed to get it to crunch my data. My data consists of flat delimited text file, consisting of 50 columns and about 20 millions of rows. I have scala scripts that will process each column.

In terms of parallel processing, I know that RDD operation run on multiple nodes. So, every time I process a column, they are processed in parallel, but the column itself is processed sequentially.

A simple example: if my data is 5 column text delimited file and each column contain text, and I want to do word count for each column. I would do:

for(i <- 0 until 4){
   data.map(_.split("\t",-1)(i)).map((_,1)).reduce(_+_)
}

Although each column's operation is run in parallel, the column itself is processed sequentially(bad wording I know. Sorry!). In other words, column 2 is processed after column 1 is done. Column 3 is processed after column 1 and 2 are done, and so on.

My question is: Is there anyway to process multiple column at a time? If you know a way, cor a tutorial, would you mind sharing it with me?

thank you!!

like image 736
user2773013 Avatar asked Aug 06 '14 22:08

user2773013


2 Answers

Suppose the inputs are seq. Following can be done to process columns concurrently. The basic idea is to using sequence (column, input) as the key.

scala> val rdd = sc.parallelize((1 to 4).map(x=>Seq("x_0", "x_1", "x_2", "x_3")))
rdd: org.apache.spark.rdd.RDD[Seq[String]] = ParallelCollectionRDD[26] at parallelize at <console>:12

scala> val rdd1 = rdd.flatMap{x=>{(0 to x.size - 1).map(idx=>(idx, x(idx)))}}
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = FlatMappedRDD[27] at flatMap at <console>:14

scala> val rdd2 = rdd1.map(x=>(x, 1))
rdd2: org.apache.spark.rdd.RDD[((Int, String), Int)] = MappedRDD[28] at map at <console>:16

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[((Int, String), Int)] = ShuffledRDD[29] at reduceByKey at <console>:18

scala> rdd3.take(4)
res22: Array[((Int, String), Int)] = Array(((0,x_0),4), ((3,x_3),4), ((2,x_2),4), ((1,x_1),4))

The example output: ((0, x_0), 4) means the first column, key is x_0, and value is 4. You can start from here to process further.

like image 170
zhang zhan Avatar answered Sep 30 '22 18:09

zhang zhan


You can try the following code, which use the scala parallize collection feature,

(0 until 4).map(index => (index,data)).par.map(x => {
    x._2.map(_.split("\t",-1)(x._1)).map((_,1)).reduce(_+_)
}

data is a reference, so duplicate the data will not cost to much. And rdd is read-only, so parallelly processing can work. The par method use the parallely collection feature. You can check the parallel jobs on the spark web UI.

like image 22
bourneli Avatar answered Sep 30 '22 16:09

bourneli