Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process the different graph files to be processed independently in between the cluster nodes in Apache Spark?

​Lets say I have a large number of graph files and each graph has around 500K edges. I have been processing these graph files on Apache Spark and I was wondering how to parallelize the entire graph processing job efficiently. Since for now, every graph file is independent with any other, I am looking for parallelism with the files. So, if I have 100 graph files and I have 20 nodes clusters, can I process each file on each node, so each node will process 5 files. Now, what is happening is like the single graph is being processed in number of stages which is causing a lot of shuffling.

graphFile = "/mnt/bucket/edges" #This directory has 100 graph files each file with around 500K edges

nodeFile = "/mnt/bucket/nodes" #This directory has node files

graphData = sc.textFile(graphFile).map(lambda line: line.split(" ")).flatMap(lambda edge: [(int(edge[0]),int(edge[1]))])

graphDataFrame = sqlContext.createDataFrame(graphData, ['src', 'dst']).withColumn("relationship", lit('edges')) # Dataframe created so as to work with Graphframes

nodeData = sc.textFile(nodeFile).map(lambda line: line.split("\s")).flatMap(lambda edge: [(int(edge[0]),)])

nodeDataFrame = sqlContext.createDataFrame(nodeData, ['id'])

graphGraphFrame = GraphFrame(nodeDataFrame, graphDataFrame)

connectedComponent = graphGraphFrame.connectedComponents()

The thing is its taking a lot of time to process even couple of files. And I have to process like 20K files. Each file has 800K edges. May be if data partition strategy can be figured out that ensures every dependent edges will be processed on single node, shuffling will be less.

Or what is the best way of solving this efficiently ?

like image 969
hsuk Avatar asked Aug 17 '16 15:08

hsuk


1 Answers

TL;DR Apache Spark is not the right tool for the job.

The main scope of Spark is data parallelism but what you're looking for is task parallelism. Theoretically core Spark engine is generic enough to be used to achieve limited task parallelism as well, but in practice there are better tools out there for job like this and it definitely not the goal of the libraries like GraphX and GraphFrames.

Since data distribution is the core assumption behind these libraries their algorithms are implemented using techniques like message passing or joins what is reflected in multistage job structure and shuffles. If data fits in the main memory (you can easily process graphs with millions of edges on single node using optimized graph processing libraries) these techniques are completely useless in practice.

Given the piece of code you've shown, in-core graph processing library like igraph or NetworkX (better documented and much more comprehensive but unfortunately memory hungry and slightly slowish) combined with GNU Parallel should be more than enough and much more efficient in practice. For more complex jobs you may consider using full featured workflow management tool like Airflow or Luigi.

like image 138
zero323 Avatar answered Nov 10 '22 00:11

zero323