Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Find size of data stored in rdd from a text file in apache spark

I am new to Apache Spark (version 1.4.1). I wrote a small code to read a text file and stored its data in Rdd .

Is there a way by which I can get the size of data in rdd .

This is my code :

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row

object RddSize {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "data size")
    val FILE_LOCATION = "src/main/resources/employees.csv"
    val peopleRdd = sc.textFile(FILE_LOCATION)

    val newRdd = peopleRdd.filter(str => str.contains(",M,"))
    //Here I want to find whats the size remaining data
  }
} 

I want to get size of data before filter Transformation (peopleRdd) and after it (newRdd).

like image 579
bob Avatar asked Aug 24 '15 09:08

bob


People also ask

How do I find my Spark dataset size?

Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.

What does RDD collect () return?

collect. Return a list that contains all of the elements in this RDD. This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.


2 Answers

There are multiple way to get the RDD size

1.Add the spark listener in your spark context

SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
  val map = stageCompleted.stageInfo.rddInfos
  map.foreach(row => {
      println("rdd memSize " + row.memSize)
      println("rdd diskSize " + row.diskSize)
   })
}})

2. Save you rdd as text file.

myRDD.saveAsTextFile("person.txt")

and call Apache Spark REST API.

/applications/[app-id]/stages

3. You can also try SizeEstimater

val rddSize = SizeEstimator.estimate(myRDD)
like image 87
Gabber Avatar answered Sep 28 '22 15:09

Gabber


I'm not sure you need to do this. You could cache the rdd and check the size in the Spark UI. But lets say that you do want to do this programmatically, here is a solution.

    def calcRDDSize(rdd: RDD[String]): Long = {
        //map to the size of each string, UTF-8 is the default
        rdd.map(_.getBytes("UTF-8").length.toLong) 
           .reduce(_+_) //add the sizes together
    }

You can then call this function for your two RDDs:

println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")

This solution should work even if the file size is larger than the memory available in the cluster.

like image 28
Patrick McGloin Avatar answered Sep 28 '22 15:09

Patrick McGloin