Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Most efficient way to access binary files on ADLS from worker node in PySpark?

I have deployed an Azure HDInsight cluster with rwx permissions for all directories on the Azure Data Lake Store that also serves as its storage account. On the head node, I can load e.g. image data from the ADLS with a command like:

my_rdd = sc.binaryFiles('adl://{}.azuredatalakestore.net/my_file.png')

Workers do not have access to the SparkContext's binaryFiles() function. I can use the azure-datalake-store Python SDK to load the file, but this seems to be much slower. I assume because it realizes none of the benefits of the association between the cluster and the ADLS.

Is there a faster way to load files from an associated ADLS on workers?

Further context if needed:

I am using PySpark to apply a trained deep learning model to a large collection of images. Since the model takes a long time to load, my ideal would be:

  • Send each worker a partial list of image URIs to process (by applying mapPartition() to an RDD containing the full list)
  • Have the worker load data for one image at a time for scoring with the model
  • Return the model's results for the set of images

Since I don't know how to load the images efficiently on workers, my best bet at the moment is to partition an RDD containing the image byte data, which (I assume) is memory-inefficient and creates a bottleneck by having the head node do all of the data loading.

like image 220
mewahl Avatar asked Nov 09 '22 03:11

mewahl


1 Answers

The primary storage of the HDInsight cluster is simply available as the HDFS root.

hdfs dfs -ls /user/digdug/images/
Found 3 items
-rw-r--r--   1    digdug supergroup       4957 2017-01-24 07:59 /user/digdug/images/a.png
-rw-r--r--   1    digdug supergroup       4957 2017-01-24 07:59 /user/digdug/images/b.png
-rw-r--r--   1    digdug supergroup       1945 2017-01-24 08:01 /user/digdug/images/c.png

In pyspark:

rdd = sc.binaryFiles("/user/digdug/images")

def f(iterator):
    sizes = []
    for i in iterator:
        sizes.append(len(i[1]))
    return sizes

rdd.mapPartitions(f).collect()

outputs:

[4957, 4957, 1945]
like image 162
Alexandre Gattiker Avatar answered Nov 14 '22 22:11

Alexandre Gattiker