I'm creating a new DataFrame with a handful of records from a Join.
val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()
Everything is fast (under one second) except the count operation. The RDD conversion kicks in and literally takes hours to complete. Is there any way to speed things up?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.
Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.
The count function counts the data and returns the data to the driver in PySpark, making the type action in PySpark. This count function in PySpark is used to count the number of rows that are present in the data frame post/pre-data analysis. b: The data frame created.
Everything is fast (under one second) except the count operation.
This is justified as follow : all operations before the count
are called transformations and this type of spark operations are lazy i.e. it doesn't do any computation before calling an action (count
in your example).
The second problem is in the repartition(1)
:
keep in mind that you'll lose all the parallelism offered by spark and you computation will be run in one executor (core if your are in standalone mode), so you must remove this step or change 1 to a number propositional to the number of your CPU cores (standalone mode) or the number of executors (cluster mode).
The RDD conversion kicks in and literally takes hours to complete.
If I understand correctly you would covert the DataFrame
to an RDD
, this is really a bad practice in spark and you should avoid such conversion as possible as you can.
this is because the data in DataFrame
and Dataset
are encoded using special spark encoders (it's called tungstant if I well remembered it) which take much less memory then the JVM serialization encoders, so such conversion mean that spark will change the type of your data from his own one (which take much less memory and let spark optimize a lot of commutations by just work the encoded data and not serialize the data to work with and then deserialize it) to the JVM data type and this why DataFrame
s and Dataset
s are very powerful than RDD
s
Hope this help you
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With