I'm running into this memory leak in the spark driver that I can't seem to figure out why. My guess is it has something to do with trying to overwrite a DataFrame variable but I can't find any documentation or other issues like this.
This is on Spark 2.1.0 (PySpark).
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("Spark Leak") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext.getOrCreate(sc)
items = 5000000
data = [str(x) for x in range(1,items)]
df = sqlContext.createDataFrame(data, StringType())
print(df.count())
for x in range(0,items):
sub_df = sqlContext.createDataFrame([str(x)], StringType())
df = df.subtract(sub_df)
print(df.count())
This will continue to run until the driver runs out of memory then dies.
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:224)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:917)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1089)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1081)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1081)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1184)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1717)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/05/25 16:55:40 ERROR DAGScheduler: Failed to update accumulators for task 13
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:915)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1089)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1081)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1081)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1184)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1717)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
...
If anything, I'd think memory should shrink since items are being removed from the DataFrame but that is not the case.
Am I not understanding how spark assigns DataFrames to Python variables or something?
I've also tried to assign the df.subtract to a new temporary variable, then unpersisting df then assigning the temporary var to df and unpersisting the temp var but that also has the same issues.
The fundamental problem here seems to be understanding what exactly DataFrame (this applies to Spark RDDs as well). A local DataFrame object effectively describes a computation which is to be performed, when some action is executed on a given object.
As a result it is a recursive structure, which captures all its dependencies. effectively execution plan with each iteration. While Spark provides tools, like checkpointing, which can be used to address this problem and cut the lineage, the code in question doesn't make much sense in the first place.
Distributed data structures available in Spark are designed for high latency, IO intensive jobs. Parallelizing individual objects, executing millions of Spark jobs on millions of distributed objects just cannot work well.
Furthermore Spark is not designed for efficient, single-item operations. Each subtract is O(N) making a whole process O(N2), and effectively useless on any large dataset.
While trivially itself, a correct way to do it, would be something like this:
items = 5000000
df1 = spark.range(items).selectExpr("cast(id as string)")
df2 = spark.range(items).selectExpr("cast(id as string)")
df1.subtract(df2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With