The situation is as follows: working on a enterprise cluster with spark 2.3, I want to run pandas_udf which requires pyarrow which requires numpy 0.14 (AFAIK). Been able to distribute pyarrow (I think, no way of verifying this 100%):
pyspark.sql.SparkSession.builder.appName("pandas_udf_poc").config("spark.executor.instances","2")\
.config("spark.executor.memory","8g")\
.config("spark.driver.memory","8g")\
.config("spark.driver.maxResultSize","8g")\
.config("py-files", "pyarrow_depnd.zip")\
.getOrCreate()
spark.sparkContext.addPyFile("pyarrow_depnd.zip")
The zip is the result of pip install to dir and zipping it.
But pyarrow does not play along with the nodes numpy 0.13, I guess I could try and distribute a full env to all nodes, but my question is, is there a way to avoid this and make the node use a diffrent numpy (which is already distributed in the pyarrow zip)
Thanks
Well, at the end, did not have to use a virtual env but could not avoid distributing a full copy of python (containing the needed dependencies) to all nodes.
First built a full copy of python (did use conda env, but you could probably use other ways):
conda create --prefix /home/me/env_conda_for_pyarrow
source activate /home/me/env_conda_for_pyarrow
conda install numpy
conda install pyarrow
in this specific case had to open the conda-forge channel, prior to installs, in order to get the latest versions.
Second, zip the distribution:
zip -r env_conda_for_pyarrow.zip env_conda_for_pyarrow
Then use archives to distribute the zip and the env var PYSPARK_PYTHON to point to it:
import os, sys
os.environ['PYSPARK_PYTHON']="dist_python/env_conda_for_pyarrow/bin/python"
import pyspark
spark = \
pyspark.sql.SparkSession.builder.appName("pysaprk_python")\
.config("spark.yarn.dist.archives", "env_conda_for_pyarrow.zip#dist_python")\
.getOrCreate()
print spark.version, spark.sparkContext.master
That's it, done. Here are a few scripts I've used for testing:
def list_nodes_dir(x): # hack to see workers file dirs
import os
return os.listdir('dist_python')
spark.sparkContext.parallelize(range(1), 1).map(list_nodes_dir).collect()
def npv(x): # hack to see workers numpy version
import numpy as np
return np.__version__
set(spark.sparkContext.parallelize(range(10), 10).map(npv).collect())
# spark documents example
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType
slen = pandas_udf(lambda s: s.str.len(), IntegerType())
@pandas_udf(StringType())
def to_upper(s):
return s.str.upper()
@pandas_udf("integer", PandasUDFType.SCALAR)
def add_one(x):
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"),
add_one("age")).show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With