Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does foreach not bring anything to the driver program?

Tags:

apache-spark

I wrote this program in spark shell

val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))

this prints some debug statements but not the actual numbers.

The code below works fine

for(num <- array.take(4)) {
  println(num)
}

I do understand that take is an action and therefore will cause spark to trigger the lazy computation.

But foreach should have worked the same way... why did foreach not bring anything back from spark and start doing the actual processing (get out of lazy mode)

How can I make the foreach on the rdd work?

like image 566
Knows Not Much Avatar asked Mar 02 '15 07:03

Knows Not Much


Video Answer


1 Answers

The RDD.foreach method in Spark runs on the cluster so each worker which contains these records is running the operations in foreach. I.e. your code is running, but they are printing out on the Spark workers stdout, not in the driver/your shell session. If you look at the output (stdout) for your Spark workers, you will see these printed to the console.

You can view the stdout on the workers by going to the web gui running for each running executor. An example URL is http://workerIp:workerPort/logPage/?appId=app-20150303023103-0043&executorId=1&logType=stdout

Spark Executor Stdout

In this example Spark chooses to put all the records of the RDD in the same partition.

This makes sense if you think about it - look at the function signature for foreach - it doesn't return anything.

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit

This is really the purpose of foreach in scala - its used to side effect.

When you collect records, you bring them back into the driver so logically collect/take operations are just running on a Scala collection within the Spark driver - you can see the log output as the spark driver/spark shell is whats printing to stdout in your session.

A use case of foreach may not seem immediately apparent, an example - if for each record in the RDD you wanted to do some external behaviour, like call a REST api, you could do this in the foreach, then each Spark worker would submit a call to the API server with the value. If foreach did bring back records, you could easily blow out the memory in the driver/shell process. This way you avoid these issues and can do side-effects on all the items in an RDD over the cluster.

If you want to see whats in an RDD I use;

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large
like image 129
NightWolf Avatar answered Nov 09 '22 23:11

NightWolf