Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Container & Executor OOMs during `reduceByKey`

I'm running a Spark job on Amazon's EMR in client mode with YARN, using pyspark, to process data from two input files (totaling 200 GB) in size.

The job joins the data together (using reduceByKey), does some maps and filters, and saves it to S3 in Parquet format. While the job uses Dataframes for saving, all of our actual transformations and actions are performed on RDDs.

Note, I've included a detailed rundown of my current configurations and values with which I've experimented already after the "Failures" section.

Code

The code relevant to the failures we're seeing takes place in the reduceByKey step. I've included a few lines of context to show one prior map function and the save operations which actually trigger the reduceByKey on the RDD:

    # Populate UC Property Type
united_rdd = united_rdd.map(converter.convert_uc_property_type(uc_property_type_mappings))

# Reduce by listingIdSha
united_rdd = united_rdd.reduceByKey(converter.merge_listings)

# Filter by each geoId and write the output to storage
schema = convert_struct(ListingRevision)
for geo in GEO_NORMALIZATION_ENABLED_GEOS:
  regional_rdd = (united_rdd.filter(lambda (id_sha, (listing_revision, geo_id)): geo_id == geo)
                            .map(lambda (id_sha, (listing_revision, geo_id)):
                                 listing_revision))
  regional_df = regional_rdd.map(lambda obj: thrift_to_row(obj, schema)).toDF(schema)
  # Write to Disk/S3
  regional_df.write.format(output_format).mode("overwrite").save(os.path.join(output_dir, geo))
  # Write to Mongo
  (regional_df.write.format("com.mongodb.spark.sql.DefaultSource")
                    .option("spark.mongodb.output.uri", mongo_uri)
                    .option("collection",
                            "{}_{}".format(geo, config.MONGO_OUTPUT_COLLECTION_SUFFIX))
                    .mode("overwrite").save())

Failures

The described job fails due to executors running out of physical memory. Multiple executors experience this failure, but here's one example printed in the EMR step's stderr as well as displayed in the Spark History server UI:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2787 in stage 3.0 failed 4 times,
 most recent failure: Lost task 2787.3 in stage 3.0 (TID 5792, ip-10-0-10-197.ec2.internal): 
 ExecutorLostFailure (executor 47 exited caused by one of the running tasks) 
 Reason: Container killed by YARN for exceeding memory limits. 20.0 GB of 20 GB physical memory used. 
 Consider boosting spark.yarn.executor.memoryOverhead.
 Driver stacktrace:
   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
   at scala.Option.foreach(Option.scala:257)
   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1923)
   at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
   ... 29 more

After discovering this, I dug deeper into the individual node's YARN and container logs and found the YARN log message with the physical memory usage spike and a java.lang.OutOfMemory exception in the container logs (included in order described below).

Java OutOfMemory Error from Container Logs:

17/03/28 21:41:44 WARN TransportChannelHandler: Exception in connection from ip-10-0-10-70.ec2.internal/10.0.10.70:7337
java.lang.OutOfMemoryError: Direct buffer memory
  at java.nio.Bits.reserveMemory(Bits.java:693)
  at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
  at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
  at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
  at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
  at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
  at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
  at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
  at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  at java.lang.Thread.run(Thread.java:745)

YARN's Recognition of Extreme Physical Memory Usage:

2017-03-28 21:42:48,986 INFO   org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 6310 for container-id container_1490736006967_0001_01_000015: 20.3 GB of 20 GB physical memory used; 24.9 GB of 100 GB virtual memory used

In summary, I'm seemingly running out of memory during a shuffle despite allocating over half of my executor's memory to off-heap space and experimenting with a wide range of executor memory settings and cores. Am I missing anything else I can try? Based on a few other helpful posts I've read (for example), these are the most common culprits for physical memory issues. Is it possible data skew could lead to this? I have experimented with measuring partition distribution for smaller subsets of the data and it looked normal, but am unable to do so with all the data for this job since it never finishes.

Configuration

EMR Spark Submit Command:

spark-submit \
   --deploy-mode client /home/hadoop/src/python/uc_spark/data_sources/realtytrac/assr_hist_extractor.py \
   --dataset_settings development \
   --mongo_uri <Internal Mongo URI> \
   --aws_access_key_id <S3 Access Key> \
   --aws_secret_key <S3 Secret Key> \
   --listing_normalization_server <Internal EC2 Address>:9502

Relevant Spark Environment Configurations: spark.executor.memory - 8 GB (Out of 20 GB of available memory per executor.) spark.yarn.executor.memoryOverhead - 12 GB spark.executor.cores - 1 (Lowest I've tried in the hope that it would work.) spark.default.parallelism - 1024 (Automatically configured based on other parameters. I've tried 4099 to no avail.)

I'm running with 64 m3.2xlarge machines, totaling 1.41 TB of memory.

NOTE: I've experimented with wide ranging values for all memory parameters except driver memory had no luck.

Update 1

I refactored my code to use Dataframe's join instead of RDD's union to combine the two input files. Once I did this, I made two important discoveries:

A rightOuter join, as opposed to our default leftOuter join, reduces our output size but fixes the problem. Given this, I'm fairly certain we have a small subset of skewed data excluded by the rightOuter join. Unfortunately, I need to do more investigation to see whether the lost data matters; we're still exploring.

Using Dataframes caused a clearer failure earlier in the process:

FetchFailed(BlockManagerId(80, ip-10-0-10-145.ec2.internal, 7337), shuffleId=2, mapId=35, reduceId=435, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3095111448
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
  at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
  at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
  at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Too large frame: 3095111448
  at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
  at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  at java.lang.Thread.run(Thread.java:745)

)

We're failing during a shuffle due to a single partition fetching too much data, a 3 GB "frame".

I'll spend the rest of the day exploring how to de-skew our data and whether we can do a leftOuter join.

like image 957
an1lam Avatar asked Oct 18 '22 15:10

an1lam


1 Answers

In case anyone discovers this, the problem turned out to result from data skew. I discovered this by switching our initial combining of the two input files to use a Dataframe join rather than an RDD union. This resulted in a more understandable error which showed that our shuffle failed trying to retrieve data. To solve this, I partitioned our data around an evenly distributed key and then everything worked.

like image 114
an1lam Avatar answered Oct 21 '22 01:10

an1lam