Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark executor out of memory in join and reduceByKey

In spark2.0, I have two dataframes and I need to first join them and do a reduceByKey to aggregate the data. I always got OOM in executor. Thanks in advance.

Data

d1 (1G, 500 million rows, cached, partitioned by col id2)

id1 id2
1   1
1   3
1   4
2   0
2   7
...

d2 (160G, 2 million rows, cached, partitioned by col id2, value col contains a list of 5000 float numbers)

id2   value
0     [0.1, 0.2, 0.0001, ...]
1     [0.001, 0.7, 0.0002, ...]
...

Now I need to join the two table to get d3 and I use spark.sql

select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2

then I do a reduceByKey on d3 and aggregate values for each id1 in table d1

d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
           .mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
           .toDF()

I did an estimation that the size of d4 would be 340G. Now I use on r3.8xlarge machine to run the job

mem: 244G
cpu: 64
Disk: 640G

Questions

I played around with some configurations but I always got OOM in executor. So, the questions are

  • is it possible to run this job on the current type of machine? or I should just use bigger machine (how big?). But I remember I came across articles/blogs saying doing terabytes processing with relatively small machines.

  • What kind of improvement I should do? e.g. spark configuration, code optimization?

  • is it possible to estimate the amount of memory needed for each executor?

Spark Configuration

Some Spark configurations I tried

config1:

--verbose
--conf spark.sql.shuffle.partitions=200
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -     XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3

config2:

--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3

config 3:

--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=true
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--executor-memory 6G
--executor-cores 2
--driver-memory 6G
--driver-cores 3

config 4:

--verbose
--conf spark.sql.shuffle.partitions=20000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 13
--executor-memory 15G
--executor-cores 5
--driver-memory 13G
--driver-cores 5

ERROR

OOM Error1 from executor

ExecutorLostFailure (executor 14 exited caused by one of the running  tasks) Reason: Container killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Heap
PSYoungGen      total 1830400K, used 1401721K [0x0000000740000000,   0x00000007be900000, 0x00000007c0000000)
eden space 1588736K, 84% used [0x0000000740000000,0x0000000791e86980,0x00000007a0f80000)
from space 241664K, 24% used [0x00000007af600000,0x00000007b3057de8,0x00000007be200000)
to  space 236032K, 0% used [0x00000007a0f80000,0x00000007a0f80000,0x00000007af600000)
ParOldGen      total 4194304K, used 4075884K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 97% used [0x0000000640000000,0x0000000738c5b198,0x0000000740000000)
Metaspace      used 59721K, capacity 60782K, committed 61056K,  reserved 1101824K
class space    used 7421K, capacity 7742K, committed 7808K, reserved 1048576K

OOM Error2 from executor

ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container marked as failed: container_1477662810360_0002_01_000008 on host: ip-172-18-9-130.ec2.internal. Exit status: 52. Diagnostics: Exception from container-launch.

Heap
PSYoungGen      total 1968128K, used 1900544K [0x0000000740000000, 0x00000007c0000000, 0x00000007c0000000)
eden space 1900544K, 100% used [0x0000000740000000,0x00000007b4000000,0x00000007b4000000)
from space 67584K, 0% used [0x00000007b4000000,0x00000007b4000000,0x00000007b8200000)
to  space 103936K, 0% used [0x00000007b9a80000,0x00000007b9a80000,0x00000007c0000000)
ParOldGen      total 4194304K, used 4194183K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 99% used [0x0000000640000000,0x000000073ffe1f38,0x0000000740000000)
Metaspace      used 59001K, capacity 59492K, committed 61056K, reserved 1101824K
class space    used 7300K, capacity 7491K, committed 7808K, reserved 1048576K

Error from Container

16/10/28 14:33:21 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$3$$anon$2.hasNext(WholeStageCodegenExec.scala:386)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:36 ERROR Utils: Uncaught exception in thread driver-heartbeater
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Double.valueOf(Double.java:519)
    at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.get(UnsafeArrayData.java:138)
    at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
    at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
    at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:57)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2517)
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2517)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:43 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[stdout writer for python,5,main]

Update 1

Looks like the data d1 is pretty skewed if I partition by id2. As a result, the join will cause OOM. If d1 was distributed evenly as I thought before, the configuration above should work.

Update 2

I posted my attempts to solve the problem in case someone also encounters similar problems.

Attempt1

My problem is that if I partition d1 by id2, then the data is pretty skewed. As a result there exist some partitions which contain almost all the id1. Therefore, the JOIN with d2 will cause OOM error. To mitigate such problem, I first identify a subset s from id2 which might cause such skewed data if partitioning by id2. Then I create a d5 from d2 including only s and d6 from d2 excluding s. Luckily, the size of d5 is not too big. So, I can broadcast join d1 with d5. Then I join d1 and d6. Then, I union the two results and do a reduceByKey. I'm very close to solve the problem. I didn't continue this way because my d1 could grow significantly bigger later on. In other words, this approach is not really scalable for me

Attempt2

Luckily in my case, most values in d2 are very small. Based on my application, I can safely remove small values and convert the vector to sparseVector to significantly reduce the size of d2. After doing this, I partition d1 by id1 and broadcast join d2 (after removing small values). Of course, one has to boost up the driver memory to allow relatively big broadcast variable. This works for me and is also scalable for my application.

like image 270
xuan Avatar asked Oct 28 '16 16:10

xuan


2 Answers

Here's something to try: reduce your executor size by a bit. You've currently got:

--executor-memory 48G
--executor-cores 15

Give this a go:

--executor-memory 16G
--executor-cores 5

Smaller executor size seems to be optimal for a variety of reasons. One of these is that java heap size greater than 32G causes object references to go from 4 bytes to 8, and all memory requirements blow up.

Edit: the problem may actually be that the d4 partitions are too large (though the other advice still applies!). You can resolve this by repartitioning d3 to a larger number of partitions (roughly d1 * 4), or by passing that to the numPartitions optional argument of reduceByKey. Both of these options will trigger a shuffle, but that's better than crashing.

like image 65
Tim Avatar answered Nov 04 '22 00:11

Tim


I got the same problem, but I searched many answers which can not solve my problem. eventually, I debug my code step by step. I find the problem that caused by the data size is not balanced for each partition. just do df_rdd.repartition(nums)

like image 45
DoG Avatar answered Nov 03 '22 23:11

DoG