Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count on Spark Dataframe is extremely slow

I'm creating a new DataFrame with a handful of records from a Join.

val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()

Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?

INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
like image 419
br0ken.pipe Avatar asked Jul 17 '17 10:07

br0ken.pipe


People also ask

Why is Spark show so slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.

Is Spark slower than pandas?

Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.

How does count work in PySpark?

The count function counts the data and returns the data to the driver in PySpark, making the type action in PySpark. This count function in PySpark is used to count the number of rows that are present in the data frame post/pre-data analysis. b: The data frame created.


1 Answers

Everything is fast (under one second) except the count operation.

This is justified as follow : all operations before the count are called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (count in your example).

The second problem is in the repartition(1) :

keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1 to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).

The RDD conversion kicks in and literally takes hours to complete.

If I understand correctly you would covert the DataFrame to an RDD, this is really a bad practice in spark and you should avoid such conversion as possible as you can. this is because the data in DataFrame and Dataset are encoded using special spark encoders (it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memory and let spark optimize a lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrames and Datasets are very powerful than RDDs

Hope this help you

like image 173
Haroun Mohammedi Avatar answered Oct 08 '22 18:10

Haroun Mohammedi