Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: graphx api OOM errors after unpersist useless RDDs

I have met an Out Of Memeory error with unknown reasons, I have released the useless RDDs immediately, but after several round of loop, OOM error still come out. My code is as following:

// single source shortest path
def sssp[VD](graph:Graph[VD,Double], source: VertexId): Graph[Double, Double] = {
    graph.mapVertices((id, _) => if (id == source) 0.0 else Double.PositiveInfinity)
        .pregel(Double.PositiveInfinity)(
            (id, dist, newDist) => scala.math.min(dist, newDist),
            triplet => {
                if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
                    Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
                }
                else {
                    Iterator.empty
                }
            },
            (a, b) => math.min(a, b)
        )
}

def selectCandidate(candidates: RDD[(VertexId, (Double, Double))]): VertexId = {
    Random.setSeed(System.nanoTime())
    val selectLow = Random.nextBoolean()
    val (vid, (_, _)) = if (selectLow) {
        println("Select lowest bound")
        candidates.reduce((x, y) => if (x._2._1 < y._2._1) x else y)
    } else {
        println("Select highest bound")
        candidates.reduce((x, y) => if (x._2._2 > y._2._2) x else y)
    }
    vid
}

val g = {/* load graph from hdfs*/}.partitionBy(EdgePartition2D,eParts).cache
println("Vertices Size: " +  g.vertices.count )
println("Edges Size: " +  g.edges.count )

val resultDiameter = {

    val diff = 0d
    val maxIterations = 100
    val filterJoin = 1e5
    val vParts = 100

    var deltaHigh = Double.PositiveInfinity
    var deltaLow = Double.NegativeInfinity

    var candidates = g.vertices.map(x => (x._1, (Double.NegativeInfinity,
        Double.PositiveInfinity)))
        .partitionBy(new HashPartitioner(vParts))
        .persist(StorageLevel.MEMORY_AND_DISK) // (vid, low, high)

    var round = 0
    var candidateCount = candidates.count
    while (deltaHigh - deltaLow > diff && candidateCount > 0 && round <= maxIterations) {

        val currentVertex = dia.selectCandidate(candidates)

        val dist: RDD[(VertexId, Double)] = dia.sssp(g, currentVertex)
            .vertices
            .partitionBy(new HashPartitioner(vParts)) // join more efficiently
            .persist(StorageLevel.MEMORY_AND_DISK)
        val eccentricity = dist.map({ case (vid, length) => length }).max
        println("Eccentricity = %.1f".format(eccentricity))

        val subDist =  if(candidateCount > filterJoin) {
            println("Directly use Dist")
            dist
        } else {  // when candidates is small than filterJoin, filter the useless vertices
            println("Filter Dist")
            val candidatesMap = candidates.sparkContext.broadcast(
                candidates.collect.toMap)
            val subDist = dist.filter({case (vid, length) =>
                candidatesMap.value.contains(vid)})
                .persist(StorageLevel.MEMORY_AND_DISK)
            println("Sub Dist Count: " + subDist.count)
            subDist
        }

        var previousCandidates = candidates
        candidates = candidates.join(subDist).map({ case (vid, ((low, high), d)) =>
            (vid,
                (Array(low, eccentricity - d, d).max,
                    Array(high, eccentricity + d).min))
        }).persist(StorageLevel.MEMORY_AND_DISK)
        candidateCount = candidates.count
        println("Candidates Count 1 : " + candidateCount)
        previousCandidates.unpersist(true) // release useless rdd
        dist.unpersist(true) // release useless rdd

        deltaLow = Array(deltaLow,
            candidates.map({ case (_, (low, _)) => low }).max).max
        deltaHigh = Array(deltaHigh, 2 * eccentricity,
            candidates.map({ case (_, (_, high)) => high }).max).min

        previousCandidates = candidates
        candidates = candidates.filter({ case (_, (low, high)) =>
            !((high <= deltaLow && low >= deltaHigh / 2d) || low == high)
        })
            .partitionBy(new HashPartitioner(vParts))  // join more efficiently
            .persist(StorageLevel.MEMORY_AND_DISK)
        candidateCount = candidates.count
        println("Candidates Count 2:" + candidateCount)
        previousCandidates.unpersist(true) // release useless rdd

        round += 1
        println(s"Round=${round},Low=${deltaLow}, High=${deltaHigh}, Candidates=${candidateCount}")
    }

    deltaLow
}

println(s"Diameter $resultDiameter")
println("Complete!")

The main data in the while block is a graph object g and an RDD candidates. g is used to compute single source shourtest path in each round and graph structure not changed. candidates size will be decreased round by round.

In each round, I manually unpersist the useless rdd with blocking mode, so I think it should have enough memory for the following operations. However, it stops for OOM in round 7 or 6 randomly. When the program came in round 6 or 7, candidates decrease seriously, about 10% or less of the origin one. Output sample as following, the candidates size decrease from 15,288,624 in round 1 to 67,451 in round 7:

Vertices Size: 15,288,624
Edges Size: 228,097,574
Select lowest bound
Eccentricity = 12.0
Directly use Dist
Candidates Count 1 : 15288624
Candidates Count 2:15288623
Round=1,Low=12.0, High=24.0, Candidates=15288623
Select lowest bound
Eccentricity = 13.0
Directly use Dist
Candidates Count 1 : 15288623
Candidates Count 2:15288622
Round=2,Low=13.0, High=24.0, Candidates=15288622
Select highest bound
Eccentricity = 18.0
Directly use Dist
Candidates Count 1 : 15288622
Candidates Count 2:6578370
Round=3,Low=18.0, High=23.0, Candidates=6578370
Select lowest bound
Eccentricity = 12.0
Directly use Dist
Candidates Count 1 : 6578370
Candidates Count 2:6504563
Round=4,Low=18.0, High=23.0, Candidates=6504563
Select lowest bound
Eccentricity = 11.0
Directly use Dist
Candidates Count 1 : 6504563
Candidates Count 2:412789
Round=5,Low=18.0, High=22.0, Candidates=412789
Select highest bound
Eccentricity = 17.0
Directly use Dist
Candidates Count 1 : 412789
Candidates Count 2:288670
Round=6,Low=18.0, High=22.0, Candidates=288670
Select highest bound
Eccentricity = 18.0
Directly use Dist
Candidates Count 1 : 288670
Candidates Count 2:67451
Round=7,Low=18.0, High=22.0, Candidates=67451

The near ends of the spark.info log

6/12/12 14:03:09 WARN YarnAllocator: Expected to find pending requests, but found none.
16/12/12 14:06:21 INFO YarnAllocator: Canceling requests for 0 executor containers
16/12/12 14:06:33 WARN YarnAllocator: Expected to find pending requests, but found none.
16/12/12 14:14:26 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
16/12/12 14:18:14 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
    at io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:123)
    at io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:218)
    at io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:260)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:347)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:744)
16/12/12 14:18:14 WARN DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272
java.io.EOFException: Premature EOF: no length prefix available
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721)
16/12/12 14:14:39 WARN AbstractConnector: 
java.lang.OutOfMemoryError: Java heap space
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:233)
    at org.spark-project.jetty.server.nio.SelectChannelConnector.accept(SelectChannelConnector.java:109)
    at org.spark-project.jetty.server.AbstractConnector$Acceptor.run(AbstractConnector.java:938)
    at org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
    at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
    at java.lang.Thread.run(Thread.java:744)
16/12/12 14:20:06 INFO ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 1 time(s) from Reporter thread.)
16/12/12 14:19:38 WARN DFSClient: Error Recovery for block BP-552217672-100.76.16.204-1470826698239:blk_1377987137_304302272 in pipeline 100.76.15.28:9003, 100.76.48.218:9003, 100.76.48.199:9003: bad datanode 100.76.15.28:9003
16/12/12 14:18:58 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
16/12/12 14:20:49 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-198] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
16/12/12 14:20:49 INFO SparkContext: Invoking stop() from shutdown hook
16/12/12 14:20:49 INFO ContextCleaner: Cleaned shuffle 446
16/12/12 14:20:49 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Failure.recover(Try.scala:185)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
    at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
    at scala.concurrent.Future$class.recover(Future.scala:324)
    at scala.concurrent.impl.Promise$DefaultPromise.recover(Promise.scala:153)
    at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:376)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:100)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:104)
    at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1630)
    at org.apache.spark.ContextCleaner.doCleanupRDD(ContextCleaner.scala:208)
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:185)
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
    at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
    at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.
    at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132)
    at org.apache.spark.rpc.akka.AkkaRpcEndpointRef.ask(AkkaRpcEnv.scala:364)
    ... 12 more
16/12/12 14:20:49 WARN QueuedThreadPool: 5 threads could not be stopped
16/12/12 14:20:49 INFO SparkUI: Stopped Spark web UI at http://10.215.154.152:56338
16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/12/12 14:20:49 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/12/12 14:21:04 WARN AkkaRpcEndpointRef: Error sending message [message = RemoveRdd(2567)] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Recipient[Actor[akka://sparkDriver/user/BlockManagerMaster#-213595070]] had already been terminated.. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)

The near ends of the gc.log

2016-12-12T14:10:43.541+0800: 16832.953: [Full GC 2971008K->2971007K(2971008K), 11.4284920 secs]
2016-12-12T14:10:54.990+0800: 16844.403: [Full GC 2971007K->2971007K(2971008K), 11.4479110 secs]
2016-12-12T14:11:06.457+0800: 16855.870: [GC 2971007K(2971008K), 0.6827710 secs]
2016-12-12T14:11:08.825+0800: 16858.237: [Full GC 2971007K->2971007K(2971008K), 11.5480350 secs]
2016-12-12T14:11:20.384+0800: 16869.796: [Full GC 2971007K->2971007K(2971008K), 11.0481490 secs]
2016-12-12T14:11:31.442+0800: 16880.855: [Full GC 2971007K->2971007K(2971008K), 11.0184790 secs]
2016-12-12T14:11:42.472+0800: 16891.884: [Full GC 2971008K->2971008K(2971008K), 11.3124900 secs]
2016-12-12T14:11:53.795+0800: 16903.207: [Full GC 2971008K->2971008K(2971008K), 10.9517160 secs]
2016-12-12T14:12:04.760+0800: 16914.172: [Full GC 2971008K->2971007K(2971008K), 11.0969500 secs]
2016-12-12T14:12:15.868+0800: 16925.281: [Full GC 2971008K->2971008K(2971008K), 11.1244090 secs]
2016-12-12T14:12:27.003+0800: 16936.416: [Full GC 2971008K->2971008K(2971008K), 11.0206800 secs]
2016-12-12T14:12:38.035+0800: 16947.448: [Full GC 2971008K->2971008K(2971008K), 11.0024270 secs]
2016-12-12T14:12:49.048+0800: 16958.461: [Full GC 2971008K->2971008K(2971008K), 10.9831440 secs]
2016-12-12T14:13:00.042+0800: 16969.454: [GC 2971008K(2971008K), 0.7338780 secs]
2016-12-12T14:13:02.496+0800: 16971.908: [Full GC 2971008K->2971007K(2971008K), 11.1536860 secs]
2016-12-12T14:13:13.661+0800: 16983.074: [Full GC 2971007K->2971007K(2971008K), 10.9956150 secs]
2016-12-12T14:13:24.667+0800: 16994.080: [Full GC 2971007K->2971007K(2971008K), 11.0139660 secs]
2016-12-12T14:13:35.691+0800: 17005.104: [GC 2971007K(2971008K), 0.6693770 secs]
2016-12-12T14:13:38.115+0800: 17007.527: [Full GC 2971007K->2971006K(2971008K), 11.0514040 secs]
2016-12-12T14:13:49.178+0800: 17018.590: [Full GC 2971007K->2971007K(2971008K), 10.8881160 secs]
2016-12-12T14:14:00.076+0800: 17029.489: [GC 2971007K(2971008K), 0.7046370 secs]
2016-12-12T14:14:02.498+0800: 17031.910: [Full GC 2971007K->2971007K(2971008K), 11.3424300 secs]
2016-12-12T14:14:13.862+0800: 17043.274: [Full GC 2971008K->2971006K(2971008K), 11.6215890 secs]
2016-12-12T14:14:25.503+0800: 17054.915: [GC 2971006K(2971008K), 0.7196840 secs]
2016-12-12T14:14:27.857+0800: 17057.270: [Full GC 2971008K->2971007K(2971008K), 11.3879990 secs]
2016-12-12T14:14:39.266+0800: 17068.678: [Full GC 2971007K->2971007K(2971008K), 11.1611420 secs]
2016-12-12T14:14:50.446+0800: 17079.859: [GC 2971007K(2971008K), 0.6976180 secs]
2016-12-12T14:14:52.782+0800: 17082.195: [Full GC 2971007K->2971007K(2971008K), 11.4318900 secs]
2016-12-12T14:15:04.235+0800: 17093.648: [Full GC 2971007K->2971007K(2971008K), 11.3429010 secs]
2016-12-12T14:15:15.598+0800: 17105.010: [GC 2971007K(2971008K), 0.6832320 secs]
2016-12-12T14:15:17.930+0800: 17107.343: [Full GC 2971008K->2971007K(2971008K), 11.1898520 secs]
2016-12-12T14:15:29.131+0800: 17118.544: [Full GC 2971007K->2971007K(2971008K), 10.9680150 secs]
2016-12-12T14:15:40.110+0800: 17129.522: [GC 2971007K(2971008K), 0.7444890 secs]
2016-12-12T14:15:42.508+0800: 17131.920: [Full GC 2971007K->2971007K(2971008K), 11.3052160 secs]
2016-12-12T14:15:53.824+0800: 17143.237: [Full GC 2971007K->2971007K(2971008K), 10.9484100 secs]
2016-12-12T14:16:04.783+0800: 17154.196: [Full GC 2971007K->2971007K(2971008K), 10.9543950 secs]
2016-12-12T14:16:15.748+0800: 17165.160: [GC 2971007K(2971008K), 0.7066150 secs]
2016-12-12T14:16:18.176+0800: 17167.588: [Full GC 2971007K->2971007K(2971008K), 11.1201370 secs]
2016-12-12T14:16:29.307+0800: 17178.719: [Full GC 2971007K->2971007K(2971008K), 11.0746950 secs]
2016-12-12T14:16:40.392+0800: 17189.805: [Full GC 2971007K->2971007K(2971008K), 11.0036170 secs]
2016-12-12T14:16:51.407+0800: 17200.819: [Full GC 2971007K->2971007K(2971008K), 10.9655670 secs]
2016-12-12T14:17:02.383+0800: 17211.796: [Full GC 2971007K->2971007K(2971008K), 10.7348560 secs]
2016-12-12T14:17:13.128+0800: 17222.540: [GC 2971007K(2971008K), 0.6679470 secs]
2016-12-12T14:17:15.450+0800: 17224.862: [Full GC 2971007K->2971007K(2971008K), 10.6219270 secs]
2016-12-12T14:17:26.081+0800: 17235.494: [Full GC 2971007K->2971007K(2971008K), 10.9158450 secs]
2016-12-12T14:17:37.016+0800: 17246.428: [Full GC 2971007K->2971007K(2971008K), 11.3107490 secs]
2016-12-12T14:17:48.337+0800: 17257.750: [Full GC 2971007K->2971007K(2971008K), 11.0769460 secs]
2016-12-12T14:17:59.424+0800: 17268.836: [GC 2971007K(2971008K), 0.6707600 secs]
2016-12-12T14:18:01.850+0800: 17271.262: [Full GC 2971007K->2970782K(2971008K), 12.6348300 secs]
2016-12-12T14:18:14.496+0800: 17283.909: [GC 2970941K(2971008K), 0.7525790 secs]
2016-12-12T14:18:16.890+0800: 17286.303: [Full GC 2971006K->2970786K(2971008K), 13.1047470 secs]
2016-12-12T14:18:30.008+0800: 17299.421: [GC 2970836K(2971008K), 0.8139710 secs]
2016-12-12T14:18:32.458+0800: 17301.870: [Full GC 2971005K->2970873K(2971008K), 13.0410540 secs]
2016-12-12T14:18:45.512+0800: 17314.925: [Full GC 2971007K->2970893K(2971008K), 12.7169690 secs]
2016-12-12T14:18:58.239+0800: 17327.652: [GC 2970910K(2971008K), 0.7314350 secs]
2016-12-12T14:19:00.557+0800: 17329.969: [Full GC 2971008K->2970883K(2971008K), 11.1889000 secs]
2016-12-12T14:19:11.767+0800: 17341.180: [Full GC 2971006K->2970940K(2971008K), 11.4069700 secs]
2016-12-12T14:19:23.185+0800: 17352.597: [GC 2970950K(2971008K), 0.6689360 secs]
2016-12-12T14:19:25.484+0800: 17354.896: [Full GC 2971007K->2970913K(2971008K), 12.6980050 secs]
2016-12-12T14:19:38.194+0800: 17367.607: [Full GC 2971004K->2970902K(2971008K), 12.7641130 secs]
2016-12-12T14:19:50.968+0800: 17380.380: [GC 2970921K(2971008K), 0.6966130 secs]
2016-12-12T14:19:53.266+0800: 17382.678: [Full GC 2971007K->2970875K(2971008K), 12.9416660 secs]
2016-12-12T14:20:06.233+0800: 17395.645: [Full GC 2971007K->2970867K(2971008K), 13.2740780 secs]
2016-12-12T14:20:19.527+0800: 17408.939: [GC 2970881K(2971008K), 0.7696770 secs]
2016-12-12T14:20:22.024+0800: 17411.436: [Full GC 2971007K->2970886K(2971008K), 13.8729770 secs]
2016-12-12T14:20:35.919+0800: 17425.331: [Full GC 2971002K->2915146K(2971008K), 12.8270160 secs]
2016-12-12T14:20:48.762+0800: 17438.175: [GC 2915155K(2971008K), 0.6856650 secs]
2016-12-12T14:20:51.271+0800: 17440.684: [Full GC 2971007K->2915307K(2971008K), 12.4895750 secs]
2016-12-12T14:21:03.771+0800: 17453.184: [GC 2915320K(2971008K), 0.6249910 secs]
2016-12-12T14:21:06.377+0800: 17455.789: [Full GC 2971007K->2914274K(2971008K), 12.6835220 secs]
2016-12-12T14:21:19.129+0800: 17468.541: [GC 2917963K(2971008K), 0.6917090 secs]
2016-12-12T14:21:21.526+0800: 17470.938: [Full GC 2971007K->2913949K(2971008K), 13.0442320 secs]
2016-12-12T14:21:36.588+0800: 17486.000: [GC 2936827K(2971008K), 0.7244690 secs]

So, the logs show that there might be memory leak existing, it might occur in two place: 1) my code or 2) code in spark graphx api

Can anyone help me find out the reason if it occurs in my code?

like image 426
bourneli Avatar asked Dec 12 '16 05:12

bourneli


2 Answers

I don't think unpersist() API is causing out of memory. OutOfMemory is caused by collect() API because collect() (which is an Action unlike Transformation) fetches the entire RDD to a single driver machine.

Few suggestions:

  1. Increasing the RAM in driver memory is one partial solution, which you have already implemented. If you are working with jdk 8, use G1GC collector to manage large heaps.

  2. You can play with storage levels (MEMORY_AND_DISK, OFF_HEAP etc) to fine-tune it for your application.

Have a look at this official documentation guide for more details.

like image 128
Ravindra babu Avatar answered Nov 14 '22 12:11

Ravindra babu


I haven't solved the problem completely, but I have fix it partly,

  • Increase the driver memory. I have mentioned above that it stoped in round 6 or 7, but when I double the driver memory, it would stop at round 14. So, I think driver memory OOM might be one reason.
  • Save the candidates RDD to HDFS, and continue the process at the next time. So, the compuation before will not be wasted.
  • Serialize candidates RDD with Kryo. It will cost some computation on decode and encode, but saves a greate amount of memory.

There are not the perfect solution, but it does work in my case. However, I hope other guys would give the perfect one.

like image 1
bourneli Avatar answered Nov 14 '22 13:11

bourneli