Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Map function of RDD not being invoked in Scala Spark

When I call the map function of an RDD is is not being applied. It works as expected for a scala.collection.immutable.List but not for an RDD. Here is some code to illustrate :

val list = List ("a" , "d" , "c" , "d")
list.map(l => {
  println("mapping list")
})

val tm = sc.parallelize(list)
tm.map(m => {
  println("mapping RDD")
})

Result of above code is :

mapping list
mapping list
mapping list
mapping list

But notice "mapping RDD" is not printed to screen. Why is this occurring ?

This is part of a larger issue where I am trying to populate a HashMap from an RDD :

  def getTestMap( dist: RDD[(String)]) = {

    var testMap = new java.util.HashMap[String , String]();

    dist.map(m => {
      println("populating map")
      testMap.put(m , m)

    })
    testMap
  }
val testM = getTestMap(tm)
println(testM.get("a"))

This code prints null

Is this due to lazy evaluation ?

like image 769
blue-sky Avatar asked Jun 24 '14 14:06

blue-sky


1 Answers

Lazy evaluation might be part of this, if map is the only operation you are executing. Spark will not schedule execution until an action (in Spark terms) is requested on the RDD lineage.

When you execute an action, the println will happening, but not on the driver where you are expecting it but rather on the slave executing that closure. Try looking into the logs of the workers.

A similar thing is happening on the hashMap population in the 2nd part of the question. The same piece of code will be executed on each partition, on separate workers and will be serialized back to the driver. Given that closures are 'cleaned' by Spark, probably testMap is being removed from the serialized closure, resulting in a null. Note that if it was only due to the map not being executed, the hashmap should be empty, not null.

If you want to transfer the data of the RDD to another structure, you need to do that in the driver. Therefore you need to force Spark to deliver all the data to the driver. That's the function of rdd.collect().

This should work for your case. Be aware that all the RDD data should fit in the memory of your driver:

import scala.collection.JavaConverters._
def getTestMap(dist: RDD[(String)]) =  dist.collect.map(m => (m , m)).toMap.asJava
like image 62
maasg Avatar answered Oct 16 '22 17:10

maasg