It is understood from Spark documentation about Scheduling Within an Application:
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."
I could found few example code of the same in Scala and Java. Can somebody give an example of how this can be implemented using PySpark?
Scheduling Within an Application. Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save , collect ) and any tasks that need to run to evaluate that action.
Spark is known for its parallel processing, which means a data frame or a resilient distributed dataset (RDD) is being distributed across the worker nodes to gain maximum performance while processing.
Spark's scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."
If you have a necessity to work with lots of Spark contexts, you can turn on special option [MultipleContexts] (1) , but it is used only for Spark internal tests and is not supposed to be used in user programs. You will get unexpected behavior while running more than one Spark context in a single JVM [SPARK-2243] (2).
I was running into the same issue, so I created a tiny self-contained example. I create multiple threads using python's threading module and submit multiple spark jobs simultaneously.
Note that by default, spark will run the jobs in First-In First-Out (FIFO): http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. In the example below, I change it to FAIR scheduling
# Prereqs: # set # spark.dynamicAllocation.enabled true # spark.shuffle.service.enabled true spark.scheduler.mode FAIR # in spark-defaults.conf import threading from pyspark import SparkContext, SparkConf def task(sc, i): print sc.parallelize(range(i*10000)).count() def run_multiple_jobs(): conf = SparkConf().setMaster('local[*]').setAppName('appname') # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application conf.set('spark.scheduler.mode', 'FAIR') sc = SparkContext(conf=conf) for i in range(4): t = threading.Thread(target=task, args=(sc, i)) t.start() print 'spark task', i, 'has started' run_multiple_jobs()
Output:
spark task 0 has started spark task 1 has started spark task 2 has started spark task 3 has started 30000 0 10000 20000
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With