Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: long delay between jobs

So we are running spark job that extract data and do some expansive data conversion and writes to several different files. Everything is running fine but I'm getting random expansive delays between resource intensive job finish and next job start.

In below picture, we can see that job that was scheduled at 17:22:02 took 15 min to finish, which means I'm expecting next job to be scheduled around 17:37:02. However, next job was scheduled at 22:05:59, which is +4 hours after job success.

When I dig into next job's spark UI it show <1 sec scheduler delay. So I'm confused to where does this 4 hours long delay is coming from.

enter image description here

(Spark 1.6.1 with Hadoop 2)

Updated:

I can confirm that David's answer below is spot on about how IO ops are handled in Spark is bit unexpected. (It makes sense to that file write essentially does "collect" behind the curtain before it writes considering ordering and/or other operations.) But I'm bit discomforted by the fact that I/O time is not included in job execution time. I guess you can see it in "SQL" tab of spark UI as queries are still running even with all jobs being successful but you cannot dive into it at all.

I'm sure there are more ways to improve but below two methods were sufficient for me:

  1. reduce file count
  2. set parquet.enable.summary-metadata to false
like image 264
jk-kim Avatar asked Apr 10 '16 02:04

jk-kim


2 Answers

I/O operations often come with significant overhead that will occur on the master node. Since this work isn't parallelized, it can take quite a bit of time. And since it is not a job, it does not show up in the resource manager UI. Some examples of I/O tasks that are done by the master node

  • Spark will write to temporary s3 directories, then move the files using the master node
  • Reading of text files often occur on the master node
  • When writing parquet files, the master node will scan all the files post-write to check the schema

These issues can be solved by tweaking yarn settings or redesigning your code. If you provide some source code, I might be able to pinpoint your issue.

Discussion of writing I/O Overhead with Parquet and s3

Discussion of reading I/O Overhead "s3 is not a filesystem"

like image 163
David Avatar answered Oct 16 '22 07:10

David


Problem:

I faced similar issue when writing parquet data on s3 with pyspark on EMR 5.5.1. All workers would finish writing data in _temporary bucket in output folder & Spark UI would show that all tasks have completed. But Hadoop Resource Manager UI would not release resources for the application neither mark it as complete. On checking s3 bucket, it seemed like spark driver was moving the files 1 by 1 from _temporary directory to output bucket which was extremely slow & all the cluster was idle except Driver node.

Solution:

The solution that worked for me was to use committer class by AWS ( EmrOptimizedSparkSqlParquetOutputCommitter ) by setting the configuration property spark.sql.parquet.fs.optimized.committer.optimization-enabled to true.

e.g.:

spark-submit ....... --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true

or

pyspark ....... --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true

Note that this property is available in EMR 5.19 or higher.

Result:

After running the spark job on EMR 5.20.0 using above solution, it did not create any _temporary directory & all the files were directly written to the output bucket, hence job finished very quickly.

Fore more details:
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html

like image 44
Abdul Mannan Avatar answered Oct 16 '22 07:10

Abdul Mannan