Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark example program runs very slow

I tried to use Spark to work on simple graph problem. I found an example program in Spark source folder: transitive_closure.py, which computes the transitive closure in a graph with no more than 200 edges and vertices. But in my own laptop, it runs more than 10 minutes and doesn't terminate. The command line I use is: spark-submit transitive_closure.py.

I wonder why spark is so slow even when computing just such small transitive closure result? Is it a common case? Is there any configuration I miss?

The program is shown below, and can be found in spark install folder at their website.

from __future__ import print_function

import sys
from random import Random

from pyspark import SparkContext

numEdges = 200
numVertices = 100
rand = Random(42)


def generateGraph():
    edges = set()
    while len(edges) < numEdges:
        src = rand.randrange(0, numEdges)
        dst = rand.randrange(0, numEdges)
        if src != dst:
            edges.add((src, dst))
    return edges


if __name__ == "__main__":
    """
    Usage: transitive_closure [partitions]
    """
    sc = SparkContext(appName="PythonTransitiveClosure")
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    tc = sc.parallelize(generateGraph(), partitions).cache()

    # Linear transitive closure: each round grows paths by one edge,
    # by joining the graph's edges with the already-discovered paths.
    # e.g. join the path (y, z) from the TC with the edge (x, y) from
    # the graph to obtain the path (x, z).

    # Because join() joins on keys, the edges are stored in reversed order.
    edges = tc.map(lambda x_y: (x_y[1], x_y[0]))

    oldCount = 0
    nextCount = tc.count()
    while True:
        oldCount = nextCount
        # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
        # then project the result to obtain the new (x, z) paths.
        new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
        tc = tc.union(new_edges).distinct().cache()
        nextCount = tc.count()
        if nextCount == oldCount:
            break

    print("TC has %i edges" % tc.count())

    sc.stop()
like image 832
c21 Avatar asked Feb 22 '16 23:02

c21


People also ask

Why is my Spark job so slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.

How can I make my Spark work faster?

Ideally, Spark organises one thread per task and per CPU core. Each task is related to a single partition. Thus, a first intuition is to configure a number of partitions at least as large as the number of available CPU cores. All cores should be occupied most of the time during the execution of the Spark job.

How do I fix a Spark memory problem?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.


1 Answers

There can many reasons why this code doesn't perform particularly well on your machine but most likely this is just another variant of the problem described in Spark iteration time increasing exponentially when using join. The simplest way to check if it is indeed the case is to provide spark.default.parallelism parameter on submit:

bin/spark-submit --conf spark.default.parallelism=2 \
  examples/src/main/python/transitive_closure.py

If not limited otherwise, SparkContext.union, RDD.join and RDD.union set a number of partitions of the child to the total number of partitions in the parents. Usually it is a desired behavior but can become extremely inefficient if applied iteratively.

like image 102
zero323 Avatar answered Oct 20 '22 11:10

zero323