Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to read images simultaneously [in parallels] using pyspark

I have 10 jpeg images in a directory. I want to read all them simultaneously using pyspark. I tried as follows.

from PIL import Image


from pyspark import SparkContext, SparkConf    

conf = SparkConf()
spark = SparkContext(conf=conf)       

files = glob.glob("E:\\tests\\*.jpg")

files_ = spark.parallelize(files)    

arrs = []

for fi in files_.toLocalIterator():      

    im = Image.open(fi)
    data = np.asarray(im)
    arrs.append(data)

img = np.array(arrs)    
print (img.shape)

The code ended without error and printed out img.shape; however, it did not run in parallel. Could you help me?

like image 643
Roman Avatar asked May 26 '21 13:05

Roman


People also ask

How do you use SparkContext?

To create a SparkContext you first need to build a SparkConf object that contains information about your application. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

What is PySpark SparkContext?

A SparkContext represents the connection to a Spark cluster, and can be used to create RDD and broadcast variables on that cluster. When you create a new SparkContext, at least the master and app name should be set, either through the named parameters here or through conf .

How do I parallelize my pyspark code?

This post discusses three different ways of achieving parallelization in PySpark: Native Spark: if you’re using Spark data frames and libraries (e.g. MLlib), then your code we’ll be parallelized and distributed natively by Spark.

What is RDD in pyspark?

In the Spark ecosystem, RDD is the basic data structure that is used in PySpark, it is an immutable collection of objects that is the basic point for a Spark Application. The Data is computed on different nodes of a Spark cluster which makes the parallel processing happen.

Is it possible to parallelize Python code in spark?

In this situation, it’s possible to use thread pools or Pandas UDFs to parallelize your Python code in a Spark environment. Just be careful about how you parallelize your tasks, and try to also distribute workloads if possible.

How to load image data into spark dataframe?

In Spark 2.3 or later you can use built-in Spark tools to load image data into Spark DataFrame. In 2.3 where image content is loaded into image.data. At this moment this functionality is experimental, and lack required ecosystem, but should improve in the future.


Video Answer


2 Answers

You can use rdd.map to load and transform the pictures in parallel and then collect the rdd into a Python list:

files = glob.glob("E:\\tests\\*.jpg")

file_rdd = spark.parallelize(files)

def image_to_array(path):
    im = Image.open(path)
    data = np.asarray(im)
    return data

array_rdd = file_rdd.map(lambda f: image_to_array(f))
result_list = array_rdd.collect()

result_list is now a list with 10 elements, each element is a numpy.ndarray.

The function image_to_array will be executed on different Spark executors in parallel. If you have a multi-node Spark cluster, you have to make sure that all nodes can access E:\\tests\\.

After collecting the arrays, processing can continue with

img = np.array(result_list, dtype=object)
like image 66
werner Avatar answered Sep 23 '22 19:09

werner


My solution follows the same idea from werner, but I did only using spark libs:

from pyspark.ml.image import ImageSchema
import numpy as np


df = (spark
      .read
      .format("image")
      .option("pathGlobFilter", "*.jpg")
      .load("your_data_path"))

df = df.select('image.*')

# Pre-caching the required schema. If you remove this line an error will be raised.
ImageSchema.imageFields

# Transforming images to np.array
arrays = df.rdd.map(ImageSchema.toNDArray).collect()

img = np.array(arrays)
print(img.shape)
like image 34
Kafels Avatar answered Sep 24 '22 19:09

Kafels