Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark job failing due to space issue

I am writing a batch processing program in Spark using pyspark. Following are the input files and their sizes

base-track.dat (3.9g)
base-attribute-link.dat (18g)
base-release.dat (543m)

These are text files with one record per line and each field is separated by a special character (refer code)

I am performing some filtering operations on attribute link and grouping them and joining with other tables.

I am submitting this program via spark-submit to a Hadoop cluster with 9 data nodes managed by Ambari. Each data node contains 140 GB of RAM and 3.5 TB of disk space.

Following is my pyspark code

import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == "__main__":
        sc = SparkContext(appName = "Tracks")
        sqlContext = SQLContext(sc)

        #Load base-track
        track = sc.textFile("base-track/input").map(lambda row: row.split(u'\u0001'))

        #Load base-attribute-link
        attlnk = sc.textFile("base-attribute-link/input").map(lambda row: row.split(u'\u0001'))

        #Load base-release
        release = sc.textFile("base-release/input").map(lambda row: row.split(u'\u0001'))

        attlnk = attlnk.filter(lambda row: row[2] == 'MA0000000162')

        attlnkg = attlnk.groupBy(lambda row: row[1])

        attlnkmax = attlnkg.map( lambda t: (t[0],max([v[4] for v in t[1]])) )

        alg = attlnkmax.map(lambda r: Row(al_objectid=r[0],al_value=r[1]))

        aldf = alg.toDF()

        track = track.map(lambda r:Row(t_tag = r[0], t_trackid= r[1], t_releaseid= r[2], t_songid = r[3], t_med= r[4], t_ph = r[5], t_tn = r[5], t_title= r[5], t_part= r[6], t_dur = r[7], t_pick = r[8], t_amgclid  = r[9], t_amgpopid = r[10], t_compid = r[11], t_muzid = r[12], t_perfid= r[13], t_albumid = r[14]))

        trackdf = track.toDF()

        release = release.map(lambda r:Row(r_tag = r[0], r_relid = r[1], r_albumid = r[2], r_mediafmtid = r[3], r_prodfmtid = r[4], r_reldate = r[5], r_prodcode = r[6], r_prodtypeid = r[7], r_label = r[8], r_relyear = r[9], r_ispurch = r[10], r_amgclassid = r[11], r_amgpopid = r[12], r_eanid = r[13], r_upcid = r[14]))

        releasedf = release.toDF()

        trackaldf = trackdf.join(aldf, trackdf['t_trackid'] == aldf['al_objectid'], 'left_outer')


        tracksdf = trackaldf.join(releasedf, trackaldf['t_releaseid'] == releasedf['r_relid'])

        tracksdf = tracksdf.select('t_trackid', 't_releaseid', 't_songid', 't_med', 't_ph', 't_tn', 't_title', 't_part', 't_dur', 't_pick', 't_amgclid', 't_amgpopid', 't_compid', 't_muzid', 'al_objectid', 't_perfid', 't_albumid', 'r_label')


        tracksdf.rdd.map(lambda x: u"\u0001".join(map(str, x))).coalesce(100).saveAsTextFile("tracks-out")

Got a bunch of the following errors when it is trying to execute this.

ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-d88c631e-cec3-4b83-8af6-a38b109b5e3b/0e/temp_shuffle_7dbda3ac-48b1-4c4a-89c7-64eb5d858d90
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:336)
    at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:209)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.flush(UnsafeRowSerializer.scala:83)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply$mcV$sp(DiskBlockObjectWriter.scala:157)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
    at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:161)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

There are a couple of questions on SO, here and here related to the same problem.

Here is what I have tried from the above two questions. I tried to increase the spark.yarn.executor.memoryOverhead to 4GB from 384 MB.

SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark -Dhadoop.tmp.dir=/mnt/ephemeral-hdfs"

export SPARK_JAVA_OPTS

First one did not have any effect. I got the error that /mnt directories are not present if I add the java opts.

After reading about this problem on multiple forums (including databricks) got some vague idea that this job is trying to create temporary files as part of the shuffle on /tmp of each cluster node and exhausting the space. On each cluster node, we have allocated 100 GB for root (/) partition on which tmp directory is present.

I have been struggling for more than a month to get this executed by playing with various spark configuration parameters. As part of tweaking, I increased spark.driver and spark.executor memory to 16g and later to 64g. Also increased spark yarn executor memory to 4GB. Unfortunately none of this could solve the space issue.

Any guidance on how to proceed further would be of great help.

[Edit-1] I was just checking the disk space of root directories on all the machines, 7 of 9 nodes in our cluster has 100+GB allocated for root directories, but on 2 nodes only 10 GB is allocated, only 6+GB is left on them. This might be causing the disk space issue, I will have to check with our IT team if the size of the root directory can be extended.

[Edit-2] I worked with the IT team to extend the root partition size to 100+GB on all the machines, but the issue still persist, may be 100GB of /tmp space is also not sufficient for this job. I estimated the output of this job to be roughly 4.6GB.

like image 780
Ravi Chandra Avatar asked Jun 21 '17 14:06

Ravi Chandra


People also ask

How do I fix a Spark memory problem?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

Why do Spark tasks fail?

In Spark, stage failures happen when there's a problem with processing a Spark task. These failures can be caused by hardware issues, incorrect Spark configurations, or code problems. When a stage failure occurs, the Spark driver logs report an exception similar to the following: org.

How do I restart a failed Spark job?

When you have failed tasks, you need to find the Stage that the tasks belong to. To do this, click on Stages in the Spark UI and then look for the Failed Stages section at the bottom of the page. If an executor runs into memory issues, it will fail the task and restart where the last task left off.


1 Answers

Given the nature of your error and the fact that you are performing large joins on tens of GB of data, in which spark workers will write intermediate data to disk as it shuffles, a 100GB disk does not seem to be enough. I recommend either allocating a lot more disk for the default worker_dir and local_dirs by either mounting them to larger disks or provisioning a much larger root disk. Also, note that if spark does not shutdown properly this intermediate data may linger and take up a lot of space on the worker nodes. So, you may have to inspect those directories and remove any stale files. If you are running spark-standalone on AWS r3,c3 or a similar instance type with large ephemeral SSD disks, I recommend mounting those disks to say "mnt" and "mnt2" and configuring spark scratch space to point to those mounts, instead of the (usually) smaller root volume. e.g:

SPARK_LOCAL_DIRS=/mnt
SPARK_WORKER_DIR=/mnt2
like image 150
Brian Cajes Avatar answered Oct 07 '22 01:10

Brian Cajes