Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Modify collection inside a Spark RDD foreach

I'm trying to add elements to a map while iterating the elements of an RDD. I'm not getting any errors, but the modifications are not happening.

It all works fine adding directly or iterating other collections:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

But when I try to do the same from an RDD:

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

I've tried printing the contents of the map as it was before the foreach to make sure the variable is the same, and it prints correctly:

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

I've also printed the modified element of the map inside the foreach code and it prints as modified, but when the operation is completed, the map seems unmodified.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Converting the RDD to an array (collect) also works fine:

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

Is this a context problem? Am I accessing a copy of the data that is being modified somewhere else?

like image 452
palako Avatar asked Apr 30 '14 17:04

palako


People also ask

Is foreach an action in Spark?

foreach() operation is an action. It does not return any value. It executes input function on each element of an RDD. It executes the function on each item in RDD.

Can data in RDD be changed once RDD is created?

It takes RDD as input and produces one or more RDD as output. Each time it creates new RDD when we apply any transformation. Thus, the so input RDDs, cannot be changed since RDD are immutable in nature.

What is foreach in Pyspark?

Introduction to PySpark foreach. PYSPARK FOR EACH is an action operation in the spark that is available with DataFrame, RDD, and Datasets in pyspark to iterate over each and every element in the dataset. The For Each function loops in through each and every element of the data and persists the result regarding that.

Is reduce an action in RDD?

On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).


1 Answers

It becomes clearer when running on a Spark cluster (not a single machine). The RDD is now spread over several machines. When you call foreach, you tell each machine what to do with the piece of the RDD that it has. If you refer to any local variables (like myMap), they get serialized and sent to the machines, so they can use it. But nothing comes back. So your original copy of myMap is unaffected.

I think this answers your question, but obviously you are trying to accomplish something and you will not be able to get there this way. Feel free to explain here or in a separate question what you are trying to do, and I will try to help.

like image 171
Daniel Darabos Avatar answered Oct 01 '22 17:10

Daniel Darabos