Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: union operation is not performed

I Know spark does lazy evaluation.

But is this expected behaviour?? With the below program the output is 20.

But if the print statement

  System.out.println("/////////////////// After "+MainRDD.count());

is uncommented, the output will be 40

I'm not doing this as is in my Application, But just to demonstrate , I created this program..

 SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaSparkSQL");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD<Integer> MainRDD;
ArrayList<Integer> list = new ArrayList<>();
JavaRDD<Integer> tmp;
for (int i = 0; i < 20; i++) {
    list.add(i);
}

MainRDD = sc.parallelize(list);// MainRDD.union(tmp);
System.out.println("//////////////////////First "+MainRDD.count());

list.clear();
for (int i = 20; i < 25; i++) {
    for (int j = 1; j < 5; j++) {
        list.add(i*j);
    }
    tmp = sc.parallelize(list);

  //      System.out.println("/////////////////// Before "+MainRDD.count());
    MainRDD = MainRDD.union(tmp);
//        System.out.println("/////////////////// After "+MainRDD.count());
    list.clear();
}

System.out.println("/////////////////// last "+MainRDD.count());
}
like image 871
Anil Avatar asked Oct 30 '22 15:10

Anil


1 Answers

The source of the problem is a mutable data structure you use to populate RDDs. When you call sc.parallelize(list)it doesn't the capture the state of the ArrayList. Since you call clear when you output the loop when data is actually evaluated there is no data at all.

Truth be told I don't know why this behavior changes when you call count method. Since RDD is not cached my guess is it is matter of either Spark or JVM internals but I won't even try to guess what is really going on there. Maybe someone smarter will be able to what the precise reason for this behavior.

Just to illustrate what is going on:

val arr = Array(1, 2, 3)

val rdd = sc.parallelize(arr)

(0 until 3).foreach(arr(_) = 99)
val tmp = sc.parallelize(arr)

tmp.union(rdd).collect
// Array[Int] = Array(99, 99, 99, 99, 99, 99) 

vs.

val arr = Array(1, 2, 3)

val rdd = sc.parallelize(arr)
rdd.count()
// Long = 3

(0 until 3).foreach(arr(_) = 99)
val tmp = sc.parallelize(arr)

tmp.union(rdd).collect
// Array[Int] = Array(99, 99, 99, 1, 2, 3)

sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array()
like image 62
zero323 Avatar answered Nov 11 '22 13:11

zero323