Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is a Spark Row object so big compared to equivalent structures?

Tags:

apache-spark

I have been playing around with the java-sizeof library (https://github.com/phatak-dev/java-sizeof) and using it to measure data set sizes in Apache Spark. As it turns out, the Row object is ridiculously big. Like hugely big -- why is that?

Take a fairly simple schema:

root
 |-- account: string (nullable = true)
 |-- date: long (nullable = true)
 |-- dialed: string (nullable = true)
 |-- duration: double (nullable = true)

Example data looks like this:

+-------+-------------+----------+--------+
|account|         date|    dialed|duration|
+-------+-------------+----------+--------+
|   5497|1434620384003|9075112643|   790.0|
+-------+-------------+----------+--------+

So now we do:

val row = df.take(1)(0)
// row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0]

So now I use SizeEstimator

SizeEstimator.estimate(row)
// res19: Long = 85050896

81 megabytes! For a single row! Thinking this is some kind of mistake, I do:

SizeEstimator.estimate(df.take(100))
// res20: Long = 85072696

Interestingly, it's not much bigger -- only about 20k bigger, despite holding 100 times the amount of data. Above 100, it seems to get linear. For 1,000 rows it looks like this:

SizeEstimator.estimate(df.take(1000))
// res21: Long = 850711696

Ok, so that's about 10 times bigger than 100 rows -- more or less linear. And from tests, it increases in a linear fashion continuing past 100 rows. Based on these tests, after about 100 rows, the cost per Row object is still over 800 KB !!

Out of curiosity, I tried a couple of different object types for the same underlying data. For example, here are the results for an Array of Array objects instead of Row objects:

SizeEstimator.estimate(
  df.map(r => (r.getString(0), r.getLong(1), r.getString(2), r.getDouble(3))).take(1)
)
// res22: Long = 216

Ok, that's a little better. Even better, is that for 10 rows it is only 1976 bytes, and for 100 it is only 19,616 bytes. Definitely going in the right direction.

Then, I encoded the same DataFrame as an RDD[Array[Byte]] where each Array[Byte] is a binary-encoded Avro record, with the same schema as the underlying DataFrame. Then I do:

SizeEstimator.estimate(encodedRdd.take(1))
// res23: Long = 72

72 bytes -- even better! And, for 100 rows, it's 5,216 bytes -- about 52 bytes a row, and it keeps going down from there (48,656 bytes for 1,000 records).

So, at it's best, Row object weighs 850k per Row, whereas a binary Avro record of the same data is about 50 bytes.

What is going on??

like image 340
David Griffin Avatar asked Apr 13 '16 18:04

David Griffin


1 Answers

Actually Row by itself is not that big. That is why you don't see a significant change in as size when you take more rows. Problem seems to be schema information:

  1. When you collect data you actually get GenericRowWithSchema

    val df = Seq((1, "foo"), (2, "bar")).toDF
    df.first.getClass
    
    // res12: Class[_ <: org.apache.spark.sql.Row] = 
    //   class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    
  2. GenericRowWithSchema carries schema information from schema argument:

    class GenericRowWithSchema(values: Array[Any], 
      override val schema: StructType)
    
  3. Lets confirm this is really the source of the problem:

    import com.madhukaraphatak.sizeof.SizeEstimator
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    
    val rowWithSchema = df.first 
    val rowWithoutSchema = new GenericRowWithSchema(
      rowWithSchema.toSeq.toArray, null)
    
    SizeEstimator.estimate(rowWithSchema)
    // Long = 1444255708
    
    SizeEstimator.estimate(rowWithoutSchema)
    // Long = 120
    
  4. Hypothesis: Estimated size you see includes a size of the schema:

    SizeEstimator.estimate(df.schema)
    // Long = 1444361928
    

    which is roughly the same order of magnitude as collected rows. Lets create a new schema from scratch:

    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("_1",IntegerType,false), 
      StructField("_2",StringType,true)))
    
    
    val anotherRowWithSchema = new GenericRowWithSchema(
      Array(0, "foo"), schema) 
    
    SizeEstimator.estimate(anotherRowWithSchema)
    // Long = 1444905324
    

    So as you can see the results are consistent.

  5. Why schema is so large? Hard to say. When you take a look at the code you'll see that StructType is a complex class even excluding its companion object not a simple schema definition.

    It doesn't explain reported size though. I suspect it could be some fluke in the SizeEstimator but I am not sure yet.

  6. You can further isolate the problem but estimation a size of a single StructField:

    import org.apache.spark.sql.types._
    import com.madhukaraphatak.sizeof.SizeEstimator
    
    object App {
      def main(args: Array[String]) {
        val schema = StructField("foo", IntegerType, true)
        println(SizeEstimator.estimate(schema))
        // 271872172
      }
    }
    
like image 136
zero323 Avatar answered Sep 20 '22 06:09

zero323