Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - GraphX - scaling connected components

I am trying to use connected components but having issue with scaling. My Here is what I have -

// get vertices
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache

// get edges
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache

// create graph  
val identityGraph = Graph(vertices, edges)

// get connected components
val cc = identityGraph.connectedComponents.vertices

Where, GraphUtil has helper functions to return vertices and edges. At this point, my graph has ~1 million nodes and ~2 million edges (btw, this is expected to grow to ~100 million nodes). My graph is pretty sparsely connected - so I expect plenty of small graphs.

When I run the above, I keep getting java.lang.OutOfMemoryError: Java heap space. I have tried with executor-memory 32g and running a cluster of 15 nodes with 45g as yarn container size.

Here is the exception detail:

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.lang.StringBuilder.toString(StringBuilder.java:405)
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360)
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216)
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

In addition, I am getting plenty of the following logs:

16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes

My question is has anyone tried ConnectedComponents at this scale? If yes, what am I doing wrong?

like image 230
Shirish Kumar Avatar asked Oct 26 '16 15:10

Shirish Kumar


1 Answers

As I posted above in the comments, I implemented connected component using map/reduce on Spark. You can find more details here - https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar and the source code under MIT license here - https://github.com/kwartile/connected-component.

like image 147
Shirish Kumar Avatar answered Sep 18 '22 17:09

Shirish Kumar