Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partitioning with Spark Graphframes

I'm working with a largish (?) graph (60 million vertices and 9.5 billion edges) using Spark Graphframes. The underlying data is not large - the vertices take about 500mb on disk and the edges are about 40gb. My containers are frequently shutting down due to java heap out of memory problems, but I think the underlying problem is that the graphframe is constantly shuffling data around (I'm seeing shuffle read/write of up to 150gb). Is there a way to efficiently partition a Graphframe or the underlying edges/vertices to reduce shuffle?

like image 205
John Avatar asked Dec 27 '16 20:12

John


People also ask

How many partitions should I use Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.

What are GraphFrames in Pyspark?

GraphFrames is a package for Apache Spark that provides DataFrame-based graphs. It provides high-level APIs in Java, Python, and Scala. It aims to provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames.

What is the default partitioning in Spark?

By default, it is set to the total number of cores on all the executor nodes. Partitions in Spark do not span multiple machines. Tuples in the same partition are guaranteed to be on the same machine. Spark assigns one task per partition and each worker can process one task at a time.


2 Answers

TL;DR It is not possible to efficiently partition Graphframe.

Graphframe algorithms can be separated into two categories:

  • Methods which delegate processing to GraphX counterpart. GraphX supports a number of partitioning methods but these are not exposed via Graphframe API. If you use one of these it is probably better to use GraphX directly.

    Unfortunately development of GraphX stopped almost completely with only a handful of small fixes over the last two years and overall performance is highly disappointing compared to both in-core libraries and out-of-core libraries.

  • Methods which are implemented natively using Spark Datasets, which considering limited programming model and only a single partitioning mode, are deeply unfit for complex graph processing.

    While relational columnar storage can be used for efficient graph processing naive iterative join approach employed by Graphframes just don't scale (but it is OK for shallow traversing with one or two hops).'

    You can try to repartition vertices and edges DataFrames by id and src respectively:

    val nPart: Int = ???
    
    GraphFrame(v.repartition(nPart, v("id")), e.repartition(e(nPart, "src")))
    

    what should help in some cases.

Overall, at it's current (Dec, 2016) state, Spark is not a good choice for intensive graph analytics.

like image 82
user7347764 Avatar answered Sep 20 '22 06:09

user7347764


Here's the partial solution / workaround - create a UDF that mimics one of the partition functions to create a new column and partition on that.

num_parts = 256
random_vertex_cut = udf.register("random_vertex_cut", lambda src, dst: math.abs((src, dst).hashCode()) % num_parts, IntegerType())

edge.withColumn("v_cut", random_vertex_cut(col("src"), col("dst")).repartition(256, "v_cut")

This approach can help some, but not as well as GraphX.

like image 27
John Avatar answered Sep 20 '22 06:09

John