I am running SPARK 1.3 in standalone mode in a cloudera environment. I can run pyspark from ipython notebook, however as soon as I add a second worker node my code stops running and returns an error. I am pretty sure this is because modules on my master are not visible to the worker node. I tried importing numpy but it didn't work even though I have numpy installed on my worker through anaconda. I have anaconda installed on both master and worker in the same way.
However, following Josh Rosen's advice I made sure that I installed the libraries on the worker nodes.
https://groups.google.com/forum/#!topic/spark-users/We_F8vlxvq0
However, I still seem to be getting issues. Including the fact that my worker does not recognize the command abs. which is standard in python 2.6
The code I am running is from this post:
https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
# Compute the number of primes in the RDD
print nums.filter(isprime).count()
Using PyPI¶ If you want to install extra dependencies for a specific component, you can install it as below: # Spark SQL pip install pyspark[sql] # pandas API on Spark pip install pyspark[pandas_on_spark] plotly # to plot your data, you can install plotly together. The default distribution uses Hadoop 3.3 and Hive 2.3.
Worker node refers to node which runs the application code in the cluster. Worker Node is the Slave Node. Master node assign work and worker node actually perform the assigned tasks. Worker node processes the data stored on the node, they report the resources to the master.
When writing Spark applications in Scala you will probably add the dependencies in your build file or when launching the app you will pass it using the --packages or --jars command-line arguments. In order to force PySpark to install the delta packages, we can use the PYSPARK_SUBMIT_ARGS.
I often use the anaconda distribution with PySpark as well and find it useful to set the PYSPARK_PYTHON
variable, pointing to the python binary within the anaconda distribution. I've found that otherwise I get lots of strange errors. You might be able to check with python is being used by running rdd.map(lambda x: sys.executable).distinct().collect()
. I suspect it's not pointing to the correct location.
In any case, I recommend wrapping the configuration of your path and environment variables in a script. I use the following.
def configure_spark(spark_home=None, pyspark_python=None):
spark_home = spark_home or "/path/to/default/spark/home"
os.environ['SPARK_HOME'] = spark_home
# Add the PySpark directories to the Python path:
sys.path.insert(1, os.path.join(spark_home, 'python'))
sys.path.insert(1, os.path.join(spark_home, 'python', 'pyspark'))
sys.path.insert(1, os.path.join(spark_home, 'python', 'build'))
# If PySpark isn't specified, use currently running Python binary:
pyspark_python = pyspark_python or sys.executable
os.environ['PYSPARK_PYTHON'] = pyspark_python
When you point to your anaconda binary, you should also be able to import all the packages installed in its site-packages
directory. This technique should work for conda environments as well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With