Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write rows asynchronously in Spark Streaming application to speed up batch execution?

I have a spark job where I need to write the output of the SQL query every micro-batch. Write is a expensive operation perf wise and is causing the batch execution time to exceed the batch interval.

I am looking for ways to improve the performance of write.

  1. Is doing the write action in a separate thread asynchronously like shown below a good option?

  2. Would this cause any side effects because Spark itself executes in a distributed manner?

  3. Are there other/better ways of speeding up the write?

    // Create a fixed thread pool to execute asynchronous tasks
    val executorService = Executors.newFixedThreadPool(2)
    dstream.foreachRDD { rdd =>
      import org.apache.spark.sql._
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
      import spark.implicits._
      import spark.sql
    
      val records = rdd.toDF("record")
      records.createOrReplaceTempView("records")
      val result = spark.sql("select * from records")
    
      // Submit a asynchronous task to write
      executorService.submit {
        new Runnable {
          override def run(): Unit = {
            result.write.parquet(output)
          }
        }
      }
    }
    
like image 808
vijay Avatar asked Jun 01 '17 06:06

vijay


2 Answers

1 - Is doing the write action in a separate thread asynchronously like shown below a good option?

No. The key to understand the issue here is to ask 'who is doing the write'. The write is done by the resources allocated for your job on the executors in a cluster. Placing the write command on an async threadpool is like adding a new office manager to an office with a fixed staff. Will two managers be able to do more work than one alone given that they have to share the same staff? Well, one reasonable answer is "only if the first manager was not giving them enough work, so there's some free capacity".

Going back to our cluster, we are dealing with a write operation that is heavy on IO. Parallelizing write jobs will lead to contention for IO resources, making each independent job longer. Initially, our job might look better than the 'single manager version', but trouble will eventually hit us. I've made a chart that attempts to illustrate how that works. Note that the parallel jobs will take longer proportionally to the amount of time that they are concurrent in the timeline.

sequential vs parallel jobs in Spark Streaming

Once we reach that point where jobs start getting delayed, we have an unstable job that will eventually fail.

2- Would this cause any side effects because Spark itself executes in a distributed manner?

Some effects I can think of:

  • Probably higher cluster load and IO contention.
  • Jobs are queuing on the Threadpool queue instead of on the Spark Streaming Queue. We loose the ability to monitor our job through the Spark UI and monitoring API, as the delays are 'hidden' and all is fine from the Spark Streaming point of view.

3- Are there other/better ways of speeding up the write? (ordered from cheap to expensive)

  • If you are appending to a parquet file, create a new file often. Appending gets expensive with time.
  • Increase your batch interval or use Window operations to write larger chunks of Parquet. Parquet likes large files
  • Tune the partition and distribution of your data => make sure that Spark can do the write in parallel
  • Increase cluster resources, add more nodes if necessary
  • Use faster storage
like image 100
maasg Avatar answered Oct 03 '22 06:10

maasg


Is doing the write action in a separate thread asynchronously like shown below a good option?

Yes. It's certainly something to consider when optimizing expensive queries and saving their results to external data stores.

Would this cause any side effects because Spark itself executes in a distributed manner?

Don't think so. SparkContext is thread-safe and promotes this kind of query execution.

Are there other/better ways of speeding up the write?

YES! That's the key to understand when to use the other (above) options. By default, Spark applications run in FIFO scheduling mode.

Quoting Scheduling Within an Application:

By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are large, then later jobs may be delayed significantly.

Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing, Spark assigns tasks between jobs in a “round robin” fashion, so that all jobs get a roughly equal share of cluster resources. This means that short jobs submitted while a long job is running can start receiving resources right away and still get good response times, without waiting for the long job to finish. This mode is best for multi-user settings.

That means that to make a room for executing multiple writes asynchronously and in parallel you should configure your Spark application to use FAIR scheduling mode (using spark.scheduler.mode property).

You will have to configure so-called Fair Scheduler Pools to "partition" executor resources (CPU and memory) into pools that you can assign to jobs using spark.scheduler.pool property.

Quoting Fair Scheduler Pools:

Without any intervention, newly submitted jobs go into a default pool, but jobs’ pools can be set by adding the spark.scheduler.pool "local property" to the SparkContext in the thread that’s submitting them.

like image 43
Jacek Laskowski Avatar answered Oct 03 '22 07:10

Jacek Laskowski