Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Compute size of Spark dataframe - SizeEstimator gives unexpected results

I am trying to find a reliable way to compute the size (in bytes) of a Spark dataframe programmatically.

The reason is that I would like to have a method to compute an "optimal" number of partitions ("optimal" could mean different things here: it could mean having an optimal partition size, or resulting in an optimal file size when writing to Parquet tables - but both can be assumed to be some linear function of the dataframe size). In other words, I would like to call coalesce(n) or repartition(n) on the dataframe, where n is not a fixed number but rather a function of the dataframe size.

Other topics on SO suggest using SizeEstimator.estimate from org.apache.spark.util to get the size in bytes of the dataframe, but the results I'm getting are inconsistent.

First of all, I'm persisting my dataframe to memory:

df.cache().count  

The Spark UI shows a size of 4.8GB in the Storage tab. Then, I run the following command to get the size from SizeEstimator:

import org.apache.spark.util.SizeEstimator SizeEstimator.estimate(df) 

This gives a result of 115'715'808 bytes =~ 116MB. However, applying SizeEstimator to different objects leads to very different results. For instance, I try computing the size separately for each row in the dataframe and sum them:

df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_) 

This results in a size of 12'084'698'256 bytes =~ 12GB. Or, I can try to apply SizeEstimator to every partition:

df.mapPartitions(     iterator => Seq(SizeEstimator.estimate(         iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator ).reduce(_+_) 

which results again in a different size of 10'792'965'376 bytes =~ 10.8GB.

I understand there are memory optimizations / memory overhead involved, but after performing these tests I don't see how SizeEstimator can be used to get a sufficiently good estimate of the dataframe size (and consequently of the partition size, or resulting Parquet file sizes).

What is the appropriate way (if any) to apply SizeEstimator in order to get a good estimate of a dataframe size or of its partitions? If there isn't any, what is the suggested approach here?

like image 487
hiryu Avatar asked Mar 26 '18 13:03

hiryu


People also ask

How do I get the size of a DataFrame in Spark?

Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.

How can I show more than 20 rows in Spark?

By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.

How do I change the number of partitions in a Spark data frame?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.


2 Answers

Unfortunately, I was not able to get reliable estimates from SizeEstimator, but I could find another strategy - if the dataframe is cached, we can extract its size from queryExecution as follows:

df.cache.foreach(_ => ()) val catalyst_plan = df.queryExecution.logical val df_size_in_bytes = spark.sessionState.executePlan(     catalyst_plan).optimizedPlan.stats.sizeInBytes 

For the example dataframe, this gives exactly 4.8GB (which also corresponds to the file size when writing to an uncompressed Parquet table).

This has the disadvantage that the dataframe needs to be cached, but it is not a problem in my case.

EDIT: Replaced df.cache.foreach(_=>_) by df.cache.foreach(_ => ()), thanks to @DavidBenedeki for pointing it out in the comments.

like image 155
hiryu Avatar answered Sep 17 '22 18:09

hiryu


Apart from Size estimator, which you have already tried(good insight)..

below is another option

RDDInfo[] getRDDStorageInfo() 

Return information about what RDDs are cached, if they are in mem or on both, how much space they take, etc.

actually spark storage tab uses this.Spark docs

Below is the implementation from spark

 /**    * :: DeveloperApi ::    * Return information about what RDDs are cached, if they are in mem or on disk, how much space    * they take, etc.    */   @DeveloperApi   def getRDDStorageInfo: Array[RDDInfo] = {     getRDDStorageInfo(_ => true)   }    private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = {     assertNotStopped()     val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray     rddInfos.foreach { rddInfo =>       val rddId = rddInfo.id       val rddStorageInfo = statusStore.asOption(statusStore.rdd(rddId))       rddInfo.numCachedPartitions = rddStorageInfo.map(_.numCachedPartitions).getOrElse(0)       rddInfo.memSize = rddStorageInfo.map(_.memoryUsed).getOrElse(0L)       rddInfo.diskSize = rddStorageInfo.map(_.diskUsed).getOrElse(0L)     }     rddInfos.filter(_.isCached)   } 

yourRDD.toDebugString from RDD also uses this. code here


General Note :

In my opinion, to get optimal number of records in each partition and check your repartition is correct and they are uniformly distributed, I would suggest to try like below... and adjust your re-partition number. and then measure the size of partition... would be more sensible. to address this kind of problems

yourdf.rdd.mapPartitionsWithIndex{case (index,rows) => Iterator((index,rows.size))}   .toDF("PartitionNumber","NumberOfRecordsPerPartition")   .show 

or existing spark functions (based on spark version)

import org.apache.spark.sql.functions._   df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show 
like image 42
Ram Ghadiyaram Avatar answered Sep 19 '22 18:09

Ram Ghadiyaram