I am new to Apache Spark (version 1.4.1). I wrote a small code to read a text file and stored its data in Rdd .
Is there a way by which I can get the size of data in rdd .
This is my code :
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row
object RddSize {
def main(args: Array[String]) {
val sc = new SparkContext("local", "data size")
val FILE_LOCATION = "src/main/resources/employees.csv"
val peopleRdd = sc.textFile(FILE_LOCATION)
val newRdd = peopleRdd.filter(str => str.contains(",M,"))
//Here I want to find whats the size remaining data
}
}
I want to get size of data before filter Transformation (peopleRdd) and after it (newRdd).
Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.
collect. Return a list that contains all of the elements in this RDD. This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
There are multiple way to get the RDD size
1.Add the spark listener in your spark context
SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val map = stageCompleted.stageInfo.rddInfos
map.foreach(row => {
println("rdd memSize " + row.memSize)
println("rdd diskSize " + row.diskSize)
})
}})
2. Save you rdd as text file.
myRDD.saveAsTextFile("person.txt")
and call Apache Spark REST API.
/applications/[app-id]/stages
3. You can also try SizeEstimater
val rddSize = SizeEstimator.estimate(myRDD)
I'm not sure you need to do this. You could cache the rdd and check the size in the Spark UI. But lets say that you do want to do this programmatically, here is a solution.
def calcRDDSize(rdd: RDD[String]): Long = {
//map to the size of each string, UTF-8 is the default
rdd.map(_.getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
You can then call this function for your two RDDs:
println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")
This solution should work even if the file size is larger than the memory available in the cluster.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With