Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run independent transformations in parallel using PySpark?

I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?

like image 623
preitam ojha Avatar asked Jun 27 '16 07:06

preitam ojha


People also ask

How do you parallel process in Pyspark?

One of the ways that you can achieve parallelism in Spark without using Spark data frames is by using the multiprocessing library. The library provides a thread abstraction that you can use to create concurrent threads of execution. However, by default all of your code will run on the driver node.

Does Pyspark run in parallel?

PYSPARK parallelize is a spark function in the spark Context that is a method of creation of an RDD in a Spark ecosystem. In the Spark ecosystem, RDD is the basic data structure that is used in PySpark, it is an immutable collection of objects that is the basic point for a Spark Application.

Does Spark UDF run in parallel?

Spark runs functions in parallel (Default) and ships copy of variable used in function to each task. -- But not across task.


1 Answers

Just use threads and make sure that cluster have enough resources to process both tasks at the same time.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

Arguably this is not that often useful in practice but otherwise should work just fine.

You can further use in-application scheduling with FAIR scheduler and scheduler pools for a better control over execution strategy.

You can also try pyspark-asyncactions (disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]
like image 85
zero323 Avatar answered Sep 22 '22 18:09

zero323