Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark distributing module imports

Over the past few days I've been working on trying to understand how Spark executors know how to use a module by a given name upon import. I am working on AWS EMR. Situation: I initialize pyspark on EMR by typing

pyspark --master yarn

Then, in pyspark,

import numpy as np ## notice the naming

def myfun(x):
    n = np.random.rand(1)
    return x*n

rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!

My understanding is that when I import numpy as np, the master node is the only node importing and identifying numpy through np. However, with an EMR cluster (2 worker nodes), if I run the map function on the rdd, the driver program sends the function to the worker nodes to execute the function for each item in the list (for each partition), and a successful result is returned.

My question is this: How do the workers know that numpy should be imported as np? Each worker has numpy already installed, but I've not defined explicitly defined a way for each node to import the module as np.

Please refer to the following post by Cloudera for further details on dependencies: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

Under Complex Dependency they have an example (code) where the pandas module is explicitly imported on each node.

One theory that I've heard being thrown around is that the driver program distributes all code passed in the pyspark interactive shell. I am skeptical of this. The example I bring up to counter this idea is, if on the master node I type:

print "hello"

is every worker node also printing "hello"? I don't think so. But maybe I am wrong on this.

like image 818
Jon Avatar asked Aug 08 '16 21:08

Jon


People also ask

How do I import PySpark modules?

Import PySpark in Python Using findspark path at runtime so that you can import PySpark modules. First Install findspark using pip command. Post successful installation, import it in Python program or shell to validate PySpark imports.

Can you use Python libraries in Spark?

PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. hence, you can install PySpark with all its features by installing Apache Spark.


1 Answers

When function is serialized there is a number of objects is being saved:

  • code
  • globals
  • defaults
  • closure
  • dict

which can be later used to restore complete environment required for a given function.

Since np is referenced by the function it can be extracted from its code:

from pyspark.cloudpickle import CloudPickler

CloudPickler.extract_code_globals(myfun.__code__)
## {'np'}

and binding can be extracted from its globals:

myfun.__globals__['np']
## <module 'numpy' from ...

So serialized closure (in a broad sense) captures all information required to restore environment. Of course all modules accessed in the closure have to be importable on every worker machine.

Everything else is just reading and writing machinery.

On a side note master node shouldn't execute any Python code. It is responsible for resources allocation not running application code.

like image 79
zero323 Avatar answered Oct 21 '22 14:10

zero323