I tried to use Spark to work on simple graph problem. I found an example program in Spark source folder: transitive_closure.py, which computes the transitive closure in a graph with no more than 200 edges and vertices. But in my own laptop, it runs more than 10 minutes and doesn't terminate. The command line I use is: spark-submit transitive_closure.py.
I wonder why spark is so slow even when computing just such small transitive closure result? Is it a common case? Is there any configuration I miss?
The program is shown below, and can be found in spark install folder at their website.
from __future__ import print_function
import sys
from random import Random
from pyspark import SparkContext
numEdges = 200
numVertices = 100
rand = Random(42)
def generateGraph():
edges = set()
while len(edges) < numEdges:
src = rand.randrange(0, numEdges)
dst = rand.randrange(0, numEdges)
if src != dst:
edges.add((src, dst))
return edges
if __name__ == "__main__":
"""
Usage: transitive_closure [partitions]
"""
sc = SparkContext(appName="PythonTransitiveClosure")
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
tc = sc.parallelize(generateGraph(), partitions).cache()
# Linear transitive closure: each round grows paths by one edge,
# by joining the graph's edges with the already-discovered paths.
# e.g. join the path (y, z) from the TC with the edge (x, y) from
# the graph to obtain the path (x, z).
# Because join() joins on keys, the edges are stored in reversed order.
edges = tc.map(lambda x_y: (x_y[1], x_y[0]))
oldCount = 0
nextCount = tc.count()
while True:
oldCount = nextCount
# Perform the join, obtaining an RDD of (y, (z, x)) pairs,
# then project the result to obtain the new (x, z) paths.
new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
tc = tc.union(new_edges).distinct().cache()
nextCount = tc.count()
if nextCount == oldCount:
break
print("TC has %i edges" % tc.count())
sc.stop()
Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.
Ideally, Spark organises one thread per task and per CPU core. Each task is related to a single partition. Thus, a first intuition is to configure a number of partitions at least as large as the number of available CPU cores. All cores should be occupied most of the time during the execution of the Spark job.
You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.
There can many reasons why this code doesn't perform particularly well on your machine but most likely this is just another variant of the problem described in Spark iteration time increasing exponentially when using join. The simplest way to check if it is indeed the case is to provide spark.default.parallelism
parameter on submit:
bin/spark-submit --conf spark.default.parallelism=2 \
examples/src/main/python/transitive_closure.py
If not limited otherwise, SparkContext.union
, RDD.join
and RDD.union
set a number of partitions of the child to the total number of partitions in the parents. Usually it is a desired behavior but can become extremely inefficient if applied iteratively.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With