Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark job just hangs with large data

I am trying to query from s3 (15 days of data). I tried querying them separately (each day) it works fine. It works fine for 14 days as well. But when I query 15 days the job keeps running forever (hangs) and the task # is not updating.

My settings :

I am using 51 node cluster r3.4x large with dynamic allocation and maximum resource turned on.

All I am doing is =

val startTime="2017-11-21T08:00:00Z"
val endTime="2017-12-05T08:00:00Z"

val start = DateUtils.getLocalTimeStamp( startTime )
val end = DateUtils.getLocalTimeStamp( endTime )

val days: Int = Days.daysBetween( start, end ).getDays

val files: Seq[String] = (0 to days)
      .map( start.plusDays )
      .map( d => s"$input_path${DateTimeFormat.forPattern( "yyyy/MM/dd" ).print( d )}/*/*" )

sqlSession.sparkContext.textFile( files.mkString( "," ) ).count

When I run the same with 14 days, I got 197337380 (count) and I ran the 15th day separately and got 27676788. But when I query 15 days total the job hangs

Update :

The job works fine with :

  var df = sqlSession.createDataFrame(sc.emptyRDD[Row], schema)

    for(n <- files ){
      val tempDF = sqlSession.read.schema( schema ).json(n)
      df = df(tempDF)
    }

df.count

But can some one explain why it works now but not before ?

UPDATE : After setting mapreduce.input.fileinputformat.split.minsize to 256 GB it works fine now.

like image 903
user3407267 Avatar asked Dec 06 '17 22:12

user3407267


1 Answers

Dynamic allocation and maximize resource allocation are both different settings, one would be disabled when other is active. With Maximize resource allocation in EMR, 1 executor per node is launched, and it allocates all the cores and memory to that executor.

I would recommend taking a different route. You seem to have a pretty big cluster with 51 nodes, not sure if it is even required. However, follow this rule of thumb to begin with, and you will get a hang of how to tune these configurations.

  • Cluster memory - minimum of 2X the data you are dealing with.

Now assuming 51 nodes is what you require, try below:

  • r3.4x has 16 CPUs - so you can put all of them to use by leaving one for the OS and other processes.
  • Set your number of executors to 150 - this will allocate 3 executors per node.
  • Set number of cores per executor to 5 (3 executors per node)
  • Set your executor memory to roughly total host memory/3 = 35G
  • You got to control the parallelism (default partitions), set this to number of total cores you have ~ 800
  • Adjust shuffle partitions - make this twice of number of cores - 1600

Above configurations have been working like a charm for me. You can monitor the resource utilization on Spark UI.

Also, in your yarn config /etc/hadoop/conf/capacity-scheduler.xml file, set yarn.scheduler.capacity.resource-calculator to org.apache.hadoop.yarn.util.resource.DominantResourceCalculator - which will allow Spark to really go full throttle with those CPUs. Restart yarn service after change.

like image 141
rohitkulky Avatar answered Sep 20 '22 14:09

rohitkulky