Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why would Spark choose to do all work on a single node?

I am having difficulty with a Spark job that, about half the time, will choose to process all data on a single node, which then runs out of memory and dies.

Question: How can I ensure that this happens none of the time?

The system is using Spark 1.6.0 on Yarn, pulling from a Hadoop 2.6 datastore, with all the code written in Java. I am having resources allocated dynamically across a cluster with a dozen-ish nodes (Amazon).

The DAG is relatively simple:

RDD --> mapToPair \  
                   coGroup --> flatMapToPair --> reduceByKey --> save
RDD --> mapToPair /

When it runs correctly, all tasks get well-distributed across the cluster and the whole job takes on the order of 20 minutes. We will call this "good behavior". Sometimes, however, the flatMapToPair stage effectively runs in a single executor. We will call this "bad behavior"

When I load up the Spark UI for a "bad behavior" job and drill into the flatMapToPair stage, I see that in fact, there are about 3-4 executors that run on each node (same as in the "good behavior" case). However, all but one finish in a fraction of a second and the remaining executor runs for 10's of minutes before it gets killed by yarn for exceeding memory limits.

Things I have already tried:

  1. The web. Searches for things like "spark run on one node" and variations nearly universally lead to folks running in local mode in the spark shell or similar configuration issues. Given that I get good behavior at least some of the time, those kinds of configuration issues seem unlikely (and I have checked that I'm not accidentally in local mode, I have ~100 partitions, ...).

  2. Other Spark jobs run on the same cluster behave well. This would seem to rule out some cluster-wide misconfiguration (heck, even this job runs well sometimes).

  3. Cluster utilization does not seem to affect whether I get the good behavior or the bad behavior. I have seen both behaviors both when the cluster is heavily utilized and when the cluster has nothing else running at all.

  4. It doesn't seem like a yarn issue since the executors all get well-distributed across the cluster. I could, of course, be wrong about that, but it really seems the issue is work distribution between the executors.

  5. There is more than one key in the dataset. I have inserted a countByKey between the coGroup and flatMapToPair and printed the results (for the 20 or so most populous keys). The data was quite evenly distributed among these top keys.

Things I have tried in response to comments

  1. Repartition the RDD right before the flatMapToPair call to force 500 partitions. This only moved the bad behavior to the repartition stage.

  2. Increase the default parallelism. I do get more partitions this way, but the bad behavior remains at the flatMapToPair stage.

  3. Strip down the data (actually I did a lot of this before posting, but failed to include it in the original list). We only have a few 10's of GB and I'm already loading the bare minimum data that I need.

This has been a "fun" little heisenbug with the bad behavior going away after adding debug logging, then staying gone after removing the logging, only to reappear some time later. I'm out of ideas, so if anyone has even some recommended diagnostic steps, I am all ears.

like image 656
Aubrey da Cunha Avatar asked Jan 28 '19 19:01

Aubrey da Cunha


People also ask

What is single node cluster in Spark?

A Single Node cluster is a cluster consisting of an Apache Spark driver and no Spark workers. A Single Node cluster supports Spark jobs and all Spark data sources, including Delta Lake. A Standard cluster requires a minimum of one Spark worker to run Spark jobs.

Does Spark need to be installed on all nodes?

No, it is not necessary to install Spark on all the 3 nodes. Since spark runs on top of Yarn, it utilizes yarn for the execution of its commands over the cluster's nodes. So, you just have to install Spark on one node.

Can you run Spark on a single machine?

Spark can run in Local Mode on a single machine or in Cluster-Mode on different machines connected to distributed computing. Local Mode is ideal for learning Spark installation and application development. All you need is your laptop or an instance on the computing cloud, like AWS EC2.


1 Answers

I ran into something very similar and while I am not entirely satisfied with the solution because I can't quite explain why it works, it does seem to work. In my case, it was after a shuffle and the size of the shuffled data was rather small. The problem was that subsequent calculations significantly increased the size of data to a point where it was a bottleneck doing those calculations on 1 or 2 executors. My best guess is that it relates to a heuristic involving preferred locations of the data source and target partition size, maybe combined with not being aware of the expansion taking place in later stages.

I was able to get a consistent, well-distributed shuffle by adding a coalesce(totalCores), where totalCores is defined as spark.executor.instances x spark.executor.cores. It also seemed to work with larger multiples of totalCores, but in my case I didn't need any more parallelism. Note, it may be necessary to use repartition instead of coalesce depending on use case. Also, this was on spark 2.2.1, for reference.

like image 197
Brad LaVigne Avatar answered Nov 15 '22 07:11

Brad LaVigne