Over the past few days I've been working on trying to understand how Spark executors know how to use a module by a given name upon import. I am working on AWS EMR. Situation: I initialize pyspark on EMR by typing
pyspark --master yarn
Then, in pyspark,
import numpy as np ## notice the naming
def myfun(x):
n = np.random.rand(1)
return x*n
rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!
My understanding is that when I import numpy as np
, the master node is the only node importing and identifying numpy
through np
. However, with an EMR cluster (2 worker nodes), if I run the map function on the rdd, the driver program sends the function to the worker nodes to execute the function for each item in the list (for each partition), and a successful result is returned.
My question is this: How do the workers know that numpy should be imported as np? Each worker has numpy already installed, but I've not defined explicitly defined a way for each node to import the module as np
.
Please refer to the following post by Cloudera for further details on dependencies: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/
Under Complex Dependency they have an example (code) where the pandas module is explicitly imported on each node.
One theory that I've heard being thrown around is that the driver program distributes all code passed in the pyspark interactive shell. I am skeptical of this. The example I bring up to counter this idea is, if on the master node I type:
print "hello"
is every worker node also printing "hello"? I don't think so. But maybe I am wrong on this.
Import PySpark in Python Using findspark path at runtime so that you can import PySpark modules. First Install findspark using pip command. Post successful installation, import it in Python program or shell to validate PySpark imports.
PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities. hence, you can install PySpark with all its features by installing Apache Spark.
When function is serialized there is a number of objects is being saved:
which can be later used to restore complete environment required for a given function.
Since np
is referenced by the function it can be extracted from its code:
from pyspark.cloudpickle import CloudPickler
CloudPickler.extract_code_globals(myfun.__code__)
## {'np'}
and binding can be extracted from its globals
:
myfun.__globals__['np']
## <module 'numpy' from ...
So serialized closure (in a broad sense) captures all information required to restore environment. Of course all modules accessed in the closure have to be importable on every worker machine.
Everything else is just reading and writing machinery.
On a side note master node shouldn't execute any Python code. It is responsible for resources allocation not running application code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With