Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark on localhost

For testing purposes, while I don´t have production cluster, I am using spark locally:

print('Setting SparkContext...')
sconf = SparkConf()
sconf.setAppName('myLocalApp')
sconf.setMaster('local[*]')
sc = SparkContext(conf=sconf)
print('Setting SparkContext...OK!')

Also, I am using a very very small dataset, consisting of only 20 rows in a postgresql database ( ~2kb)

Also(!), my code is quite simple as well, only grouping 20 rows by a key and applying a trivial map operation

params = [object1, object2]
rdd = df.rdd.keyBy(lambda x: (x.a, x.b, x.c)) \
                          .groupByKey() \
                          .mapValues(lambda value: self.__data_interpolation(value, params))


def __data_interpolation(self, data, params):
    # TODO: only for testing
    return data

What bothers me is that the whole execution takes about 5 minutes!!

Inspecting the Spark UI, I see that most of the time was spent in Stage 6: byKey method. (Stage 7, collect() method was also slow...)

Some info:

enter image description here

enter image description here

These numbers make no sense to me... Why do I need 22 tasks, executing for 54 sec, to process less than 1 kb of data

Can it be a network issue, trying to figure out the ip address of localhost? I don't know... Any clues?

like image 788
guilhermecgs Avatar asked Nov 03 '16 20:11

guilhermecgs


People also ask

How do I run a Spark code locally?

On your local machine, inside the /home/jung/sparkapp/ directory, which contains application source code, type: $ sbt package to create an uber JAR. Let's deploy our application to our Spark cluster. First, we need to scp the JAR file we created to the master instance. Then, ssh into the master instance.

Can Spark be run locally?

It's easy to run locally on one machine — all you need is to have java installed on your system PATH , or the JAVA_HOME environment variable pointing to a Java installation. Spark runs on Java 8/11/17, Scala 2.12/2.13, Python 3.7+ and R 3.5+.

What is local * In Spark?

setMaster(local[*]) is to run Spark locally with as many worker threads as logical cores on your machine. Alternatively, you can also set this value with the spark-shell or spark-submit command.


2 Answers

It appears the main reason for the slower performance in your code snippet is due to the use of groupByKey(). The issue with groupByKey is that it ends up shuffling all of the key-value pairs resulting in a lot of data unnecessarily being transferred. A good reference to explain this issue is Avoid GroupByKey.

To work around this issue, you can:

  1. Try using reduceByKey which should be faster (more info is also included in the above Avoid GroupByKey link).
  2. Use DataFrames (instead of RDDs) as DFs include performance optimizations (and the DF GroupBy statement is faster than the RDD version). As well, as you're using Python, you can avoid the Python-to-JVM issues with PySpark RDDs. More information on this can be seen in PySpark Internals

By the way, reviewing the Spark UI diagram above, the #22 refers to the task # within the DAG (not the number of tasks executed).

HTH!

like image 114
Denny Lee Avatar answered Oct 23 '22 06:10

Denny Lee


I suppose the "postgresql" is the key to solve that puzzle.

keyBy is probably the first operation that really uses the data so it's execution time is bigger as it needs to get the data from external database. You can verify it by adding at the beginning:

df.cache()
df.count() # to fill the cache
df.rdd.keyBy....

If I am right, you need to optimize the database. It may be:

  1. Network issue (slow network to DB server)
  2. Complicated (and slow) SQL on this database (try it using postgre shell)
  3. Some authorization difficulties on DB server
  4. Problem with JDBC driver you use
like image 33
Mariusz Avatar answered Oct 23 '22 07:10

Mariusz