Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run concurrent jobs(actions) in Apache Spark using single spark context

Tags:


It says in Apache Spark documentation "within each Spark application, multiple “jobs” (Spark actions) may be running concurrently if they were submitted by different threads". Can someone explain how to achieve this concurrency for the following sample code?

    SparkConf conf = new SparkConf().setAppName("Simple_App");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
    JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");

    System.out.println(file1.count());
    System.out.println(file2.count());

These two jobs are independent and must run concurrently.
Thank You.

like image 653
Sporty Avatar asked Feb 25 '15 06:02

Sporty


People also ask

How can you make concurrent execution in Spark?

One of the ways that you can achieve parallelism in Spark without using Spark data frames is by using the multiprocessing library. The library provides a thread abstraction that you can use to create concurrent threads of execution. However, by default all of your code will run on the driver node.

Can we have multiple jobs in single executor in Spark?

You can submit multiple jobs through the same spark context if you make calls from different threads (actions are blocking). But the scheduling will have the final word on how "in parallel" those jobs run. @NagendraPalla spark-submit is to submit a Spark application for execution (not jobs).

Can Spark run multiple jobs in parallel?

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save , collect ) and any tasks that need to run to evaluate that action.

How many Spark contexts can be run simultaneously in an application?

You can only have one SparkContext at one time.


1 Answers

Try something like this:

    final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App");
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    // Start thread 1
    Future<Long> future1 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
            return file1.count();
        }
    });
    // Start thread 2
    Future<Long> future2 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
            return file2.count();
        }
    });
    // Wait thread 1
    System.out.println("File1:"+future1.get());
    // Wait thread 2
    System.out.println("File2:"+future2.get());
like image 52
G Quintana Avatar answered Oct 06 '22 08:10

G Quintana