I would like to save/read numpy arrays from/to worker machines (function) to HDFS efficiently in PySpark. I have two machines A and B. A has the master and worker. B has one worker. For e.g. I would like to achieve something as below:
if __name__ == "__main__":
conf = SparkConf().setMaster("local").setAppName("Test")
sc = SparkContext(conf = conf)
sc.parallelize([0,1,2,3], 2).foreachPartition(func)
def func(iterator):
P = << LOAD from HDFS or Shared Memory as numpy array>>
for x in iterator:
P = P + x
<< SAVE P (numpy array) to HDFS/ shared file system >>
What can be a fast and efficient method for this?
I stumbled upon the same problem. and finally used a workaround using the HdfsCli module and tempfiles with Python3.4.
from hdfs import InsecureClient
from tempfile import TemporaryFile
def get_hdfs_client():
return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
root="<hdfs base path>")
hdfs_client = get_hdfs_client()
# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()
with hdfs_client.read(path) as reader:
tf.write(reader.read())
tf.seek(0) # important, set cursor to beginning of file
np_array = numpy.load(tf)
...
# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(), overwrite=True)
Notes:
http://
, because it uses the web interface of the hdfs file system;/tmp
) is that you ensure no garbage files stay in the cluster machines after the script ends, normally or notIf you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With