I'm running a Spark job on Amazon's EMR in client mode with YARN, using pyspark, to process data from two input files (totaling 200 GB) in size.
The job joins the data together (using reduceByKey
), does some maps and filters, and saves it to S3 in Parquet format. While the job uses Dataframes for saving, all of our actual transformations and actions are performed on RDDs.
Note, I've included a detailed rundown of my current configurations and values with which I've experimented already after the "Failures" section.
The code relevant to the failures we're seeing takes place in the reduceByKey
step. I've included a few lines of context to show one prior map function and the save operations which actually trigger the reduceByKey
on the RDD:
# Populate UC Property Type
united_rdd = united_rdd.map(converter.convert_uc_property_type(uc_property_type_mappings))
# Reduce by listingIdSha
united_rdd = united_rdd.reduceByKey(converter.merge_listings)
# Filter by each geoId and write the output to storage
schema = convert_struct(ListingRevision)
for geo in GEO_NORMALIZATION_ENABLED_GEOS:
regional_rdd = (united_rdd.filter(lambda (id_sha, (listing_revision, geo_id)): geo_id == geo)
.map(lambda (id_sha, (listing_revision, geo_id)):
listing_revision))
regional_df = regional_rdd.map(lambda obj: thrift_to_row(obj, schema)).toDF(schema)
# Write to Disk/S3
regional_df.write.format(output_format).mode("overwrite").save(os.path.join(output_dir, geo))
# Write to Mongo
(regional_df.write.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.output.uri", mongo_uri)
.option("collection",
"{}_{}".format(geo, config.MONGO_OUTPUT_COLLECTION_SUFFIX))
.mode("overwrite").save())
The described job fails due to executors running out of physical memory. Multiple executors experience this failure, but here's one example printed in the EMR step's stderr as well as displayed in the Spark History server UI:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2787 in stage 3.0 failed 4 times,
most recent failure: Lost task 2787.3 in stage 3.0 (TID 5792, ip-10-0-10-197.ec2.internal):
ExecutorLostFailure (executor 47 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 20.0 GB of 20 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1923)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
... 29 more
After discovering this, I dug deeper into the individual node's YARN and container logs and found the YARN log message with the physical memory usage spike and a java.lang.OutOfMemory
exception in the container logs (included in order described below).
Java OutOfMemory
Error from Container Logs:
17/03/28 21:41:44 WARN TransportChannelHandler: Exception in connection from ip-10-0-10-70.ec2.internal/10.0.10.70:7337
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
YARN's Recognition of Extreme Physical Memory Usage:
2017-03-28 21:42:48,986 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 6310 for container-id container_1490736006967_0001_01_000015: 20.3 GB of 20 GB physical memory used; 24.9 GB of 100 GB virtual memory used
In summary, I'm seemingly running out of memory during a shuffle despite allocating over half of my executor's memory to off-heap space and experimenting with a wide range of executor memory settings and cores. Am I missing anything else I can try? Based on a few other helpful posts I've read (for example), these are the most common culprits for physical memory issues. Is it possible data skew could lead to this? I have experimented with measuring partition distribution for smaller subsets of the data and it looked normal, but am unable to do so with all the data for this job since it never finishes.
EMR Spark Submit Command:
spark-submit \
--deploy-mode client /home/hadoop/src/python/uc_spark/data_sources/realtytrac/assr_hist_extractor.py \
--dataset_settings development \
--mongo_uri <Internal Mongo URI> \
--aws_access_key_id <S3 Access Key> \
--aws_secret_key <S3 Secret Key> \
--listing_normalization_server <Internal EC2 Address>:9502
Relevant Spark Environment Configurations:
spark.executor.memory
- 8 GB (Out of 20 GB of available memory per executor.)
spark.yarn.executor.memoryOverhead
- 12 GB
spark.executor.cores
- 1 (Lowest I've tried in the hope that it would work.)
spark.default.parallelism
- 1024 (Automatically configured based on other parameters. I've tried 4099 to no avail.)
I'm running with 64 m3.2xlarge
machines, totaling 1.41 TB of memory.
NOTE: I've experimented with wide ranging values for all memory parameters except driver memory had no luck.
I refactored my code to use Dataframe's join
instead of RDD's union
to combine the two input files. Once I did this, I made two important discoveries:
A rightOuter
join, as opposed to our default leftOuter
join, reduces our output size but fixes the problem. Given this, I'm fairly certain we have a small subset of skewed data excluded by the rightOuter
join. Unfortunately, I need to do more investigation to see whether the lost data matters; we're still exploring.
Using Dataframes caused a clearer failure earlier in the process:
FetchFailed(BlockManagerId(80, ip-10-0-10-145.ec2.internal, 7337), shuffleId=2, mapId=35, reduceId=435, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3095111448
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Too large frame: 3095111448
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
)
We're failing during a shuffle due to a single partition fetching too much data, a 3 GB "frame".
I'll spend the rest of the day exploring how to de-skew our data and whether we can do a leftOuter
join.
In case anyone discovers this, the problem turned out to result from data skew. I discovered this by switching our initial combining of the two input files to use a Dataframe join rather than an RDD union. This resulted in a more understandable error which showed that our shuffle failed trying to retrieve data. To solve this, I partitioned our data around an evenly distributed key and then everything worked.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With