I have RDD[Row]
, which needs to be persisted to a third party repository. But this third party repository accepts of maximum of 5 MB in a single call.
So I want to create partition based on the size of the data present in RDD and not based on the number of rows present in RDD.
How can I find the size of a RDD
and create partitions based on it?
Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.
You need to call getNumPartitions() on the DataFrame's underlying RDD, e.g., df. rdd. getNumPartitions() . In the case of Scala, this is a parameterless method: df.
PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.
As Justin and Wang mentioned it is not straight forward to get the size of RDD. We can just do a estimate.
We can sample a RDD and then use SizeEstimator to get the size of sample. As Wang and Justin mentioned, based on the size data sampled offline, say, X rows used Y GB offline, Z rows at runtime may take Z*Y/X GB
Here is the sample scala code to get the size/estimate of a RDD.
I am new to scala and spark. Below sample may be written in a better way
def getTotalSize(rdd: RDD[Row]): Long = { // This can be a parameter val NO_OF_SAMPLE_ROWS = 10l; val totalRows = rdd.count(); var totalSize = 0l if (totalRows > NO_OF_SAMPLE_ROWS) { val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS) val sampleRDDSize = getRDDSize(sampleRDD) totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS) } else { // As the RDD is smaller than sample rows count, we can just calculate the total RDD size totalSize = getRDDSize(rdd) } totalSize } def getRDDSize(rdd: RDD[Row]) : Long = { var rddSize = 0l val rows = rdd.collect() for (i <- 0 until rows.length) { rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] }) } rddSize }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With