Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is this a bug of spark stream or memory leak?

I submit my code to a spark stand alone cluster. Submit command is like below:

nohup ./bin/spark-submit  \  
--master spark://ES01:7077 \
--executor-memory 4G \
--num-executors 1 \
--total-executor-cores 1 \
--conf "spark.storage.memoryFraction=0.2"  \
./myCode.py 1>a.log 2>b.log &

I specify the executor use 4G memory in above command. But use the top command to monitor the executor process, I notice the memory usage keeps growing. Now the top Command output is below:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                    
12578 root      20   0 20.223g 5.790g  23856 S  61.5 37.3  20:49.36 java       

My total memory is 16G so 37.3% is already bigger than the 4GB I specified. And it is still growing.

Use the ps command , you can know it is the executor process.

[root@ES01 ~]# ps -awx | grep spark | grep java
10409 ?        Sl     1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080
10603 ?        Sl     6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077
12420 ?        Sl    10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py
12578 ?        Sl    21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://[email protected]:52660

Below are the code. It is very simple so I do not think there is memory leak

if __name__ == "__main__":

    dataDirectory = '/stream/raw'

    sc = SparkContext(appName="Netflow")
    ssc = StreamingContext(sc, 20)

    # Read CSV File
    lines = ssc.textFileStream(dataDirectory)

    lines.foreachRDD(process)

    ssc.start()
    ssc.awaitTermination()

The code for process function is below. Please note that I am using HiveContext not SqlContext here. Because SqlContext do not support window function

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

def process(time, rdd):

    if rdd.isEmpty():
        return sc.emptyRDD()

    sqlContext = getSqlContextInstance(rdd.context)

    # Convert CSV File to Dataframe
    parts = rdd.map(lambda l: l.split(","))
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11])))
    dataframe = sqlContext.createDataFrame(rowRdd)

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")

    ret.show()
    dataframe.show()

Actually I found below code will cause the problem:

    # Get the top 2 interface of each router
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits'))
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc())
    rank = func.dense_rank().over(windowSpec)
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2")
    ret.show()

Because If I remove these 5 line. The code can run all night without showing memory increase. But adding them will cause the memory usage of executor grow to a very high number.

Basically the above code is just some window + grouby in SparkSQL. So is this a bug?

like image 664
Kramer Li Avatar asked May 11 '16 03:05

Kramer Li


People also ask

What is memory leak in Spark?

Memory leak in the application and application master gets killed and runs again after running out of the XmX and eventually gets killed. The spark application as the RSS memory of the process keeps on growing very slowly and gets killed by NM eventually.

How do I fix a Spark memory problem?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

What could be the possible cause of memory leaks?

A memory leak may also happen when an object is stored in memory but cannot be accessed by the running code (i.e. unreachable memory). A memory leak has symptoms similar to a number of other problems and generally can only be diagnosed by a programmer with access to the program's source code.

Where are memory leaks found?

A memory leak is a situation where unused objects occupy unnecessary space in memory. Unused objects are typically removed by the Java Garbage Collector (GC) but in cases where objects are still being referenced, they are not eligible to be removed.


1 Answers

Disclaimer: this answer isn't based on debugging, but more on observations and the documentation Apache Spark provides

I don't believe that this is a bug to begin with!

Looking at your configurations, we can see that you are focusing mostly on the executor tuning, which isn't wrong, but you are forgetting the driver part of the equation.

Looking at the spark cluster overview from Apache Spark documentaion

enter image description here

As you can see, each worker has an executor, however, in your case, the worker node is the same as the driver node! Which frankly is the case when you run locally or on a standalone cluster in a single node.

Further, the driver takes 1G of memory by default unless tuned using spark.driver.memory flag. Furthermore, you should not forget about the heap usage from the JVM itself, and the Web UI that's been taken care of by the driver too AFAIK!

When you delete the lines of code you mentioned, your code is left without actions as map function is just a transformation, hence, there will be no execution, and therefore, you don't see memory increase at all!

Same applies on groupBy as it is just a transformation that will not be executed unless an action is being called which in your case is agg and show further down the stream!

That said, try to minimize your driver memory and the overall number of cores in spark which is defined by spark.cores.max if you want to control the number of cores on this process, then cascade down to the executors. Moreover, I would add spark.python.profile.dump to your list of configuration so you can see a profile for your spark job execution, which can help you more with understanding the case, and to tune your cluster more to your needs.

like image 117
mamdouh alramadan Avatar answered Sep 19 '22 06:09

mamdouh alramadan