For testing purposes, while I don´t have production cluster, I am using spark locally:
print('Setting SparkContext...')
sconf = SparkConf()
sconf.setAppName('myLocalApp')
sconf.setMaster('local[*]')
sc = SparkContext(conf=sconf)
print('Setting SparkContext...OK!')
Also, I am using a very very small dataset, consisting of only 20 rows in a postgresql database ( ~2kb)
Also(!), my code is quite simple as well, only grouping 20 rows by a key and applying a trivial map operation
params = [object1, object2]
rdd = df.rdd.keyBy(lambda x: (x.a, x.b, x.c)) \
.groupByKey() \
.mapValues(lambda value: self.__data_interpolation(value, params))
def __data_interpolation(self, data, params):
# TODO: only for testing
return data
What bothers me is that the whole execution takes about 5 minutes!!
Inspecting the Spark UI, I see that most of the time was spent in Stage 6: byKey method. (Stage 7, collect() method was also slow...)
Some info:
These numbers make no sense to me... Why do I need 22 tasks, executing for 54 sec, to process less than 1 kb of data
Can it be a network issue, trying to figure out the ip address of localhost? I don't know... Any clues?
On your local machine, inside the /home/jung/sparkapp/ directory, which contains application source code, type: $ sbt package to create an uber JAR. Let's deploy our application to our Spark cluster. First, we need to scp the JAR file we created to the master instance. Then, ssh into the master instance.
It's easy to run locally on one machine — all you need is to have java installed on your system PATH , or the JAVA_HOME environment variable pointing to a Java installation. Spark runs on Java 8/11/17, Scala 2.12/2.13, Python 3.7+ and R 3.5+.
setMaster(local[*]) is to run Spark locally with as many worker threads as logical cores on your machine. Alternatively, you can also set this value with the spark-shell or spark-submit command.
It appears the main reason for the slower performance in your code snippet is due to the use of groupByKey()
. The issue with groupByKey
is that it ends up shuffling all of the key-value pairs resulting in a lot of data unnecessarily being transferred. A good reference to explain this issue is Avoid GroupByKey.
To work around this issue, you can:
reduceByKey
which should be faster (more info is also included in the above Avoid GroupByKey link).By the way, reviewing the Spark UI diagram above, the #22 refers to the task # within the DAG (not the number of tasks executed).
HTH!
I suppose the "postgresql" is the key to solve that puzzle.
keyBy
is probably the first operation that really uses the data so it's execution time is bigger as it needs to get the data from external database. You can verify it by adding at the beginning:
df.cache()
df.count() # to fill the cache
df.rdd.keyBy....
If I am right, you need to optimize the database. It may be:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With