I have 10 jpeg images in a directory. I want to read all them simultaneously using pyspark. I tried as follows.
from PIL import Image
from pyspark import SparkContext, SparkConf
conf = SparkConf()
spark = SparkContext(conf=conf)
files = glob.glob("E:\\tests\\*.jpg")
files_ = spark.parallelize(files)
arrs = []
for fi in files_.toLocalIterator():
im = Image.open(fi)
data = np.asarray(im)
arrs.append(data)
img = np.array(arrs)
print (img.shape)
The code ended without error and printed out img.shape
; however, it did not run in parallel.
Could you help me?
To create a SparkContext you first need to build a SparkConf object that contains information about your application. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.
A SparkContext represents the connection to a Spark cluster, and can be used to create RDD and broadcast variables on that cluster. When you create a new SparkContext, at least the master and app name should be set, either through the named parameters here or through conf .
This post discusses three different ways of achieving parallelization in PySpark: Native Spark: if you’re using Spark data frames and libraries (e.g. MLlib), then your code we’ll be parallelized and distributed natively by Spark.
In the Spark ecosystem, RDD is the basic data structure that is used in PySpark, it is an immutable collection of objects that is the basic point for a Spark Application. The Data is computed on different nodes of a Spark cluster which makes the parallel processing happen.
In this situation, it’s possible to use thread pools or Pandas UDFs to parallelize your Python code in a Spark environment. Just be careful about how you parallelize your tasks, and try to also distribute workloads if possible.
In Spark 2.3 or later you can use built-in Spark tools to load image data into Spark DataFrame. In 2.3 where image content is loaded into image.data. At this moment this functionality is experimental, and lack required ecosystem, but should improve in the future.
You can use rdd.map to load and transform the pictures in parallel and then collect the rdd into a Python list:
files = glob.glob("E:\\tests\\*.jpg")
file_rdd = spark.parallelize(files)
def image_to_array(path):
im = Image.open(path)
data = np.asarray(im)
return data
array_rdd = file_rdd.map(lambda f: image_to_array(f))
result_list = array_rdd.collect()
result_list
is now a list with 10 elements, each element is a numpy.ndarray
.
The function image_to_array
will be executed on different Spark executors in parallel. If you have a multi-node Spark cluster, you have to make sure that all nodes can access E:\\tests\\
.
After collecting the arrays, processing can continue with
img = np.array(result_list, dtype=object)
My solution follows the same idea from werner, but I did only using spark libs:
from pyspark.ml.image import ImageSchema
import numpy as np
df = (spark
.read
.format("image")
.option("pathGlobFilter", "*.jpg")
.load("your_data_path"))
df = df.select('image.*')
# Pre-caching the required schema. If you remove this line an error will be raised.
ImageSchema.imageFields
# Transforming images to np.array
arrays = df.rdd.map(ImageSchema.toNDArray).collect()
img = np.array(arrays)
print(img.shape)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With