I have been playing around with the java-sizeof
library (https://github.com/phatak-dev/java-sizeof) and using it to measure data set sizes in Apache Spark. As it turns out, the Row
object is ridiculously big. Like hugely big -- why is that?
Take a fairly simple schema:
root
|-- account: string (nullable = true)
|-- date: long (nullable = true)
|-- dialed: string (nullable = true)
|-- duration: double (nullable = true)
Example data looks like this:
+-------+-------------+----------+--------+
|account| date| dialed|duration|
+-------+-------------+----------+--------+
| 5497|1434620384003|9075112643| 790.0|
+-------+-------------+----------+--------+
So now we do:
val row = df.take(1)(0)
// row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0]
So now I use SizeEstimator
SizeEstimator.estimate(row)
// res19: Long = 85050896
81 megabytes! For a single row! Thinking this is some kind of mistake, I do:
SizeEstimator.estimate(df.take(100))
// res20: Long = 85072696
Interestingly, it's not much bigger -- only about 20k bigger, despite holding 100 times the amount of data. Above 100, it seems to get linear. For 1,000 rows it looks like this:
SizeEstimator.estimate(df.take(1000))
// res21: Long = 850711696
Ok, so that's about 10 times bigger than 100 rows -- more or less linear. And from tests, it increases in a linear fashion continuing past 100 rows. Based on these tests, after about 100 rows, the cost per Row object is still over 800 KB !!
Out of curiosity, I tried a couple of different object types for the same underlying data. For example, here are the results for an Array
of Array
objects instead of Row
objects:
SizeEstimator.estimate(
df.map(r => (r.getString(0), r.getLong(1), r.getString(2), r.getDouble(3))).take(1)
)
// res22: Long = 216
Ok, that's a little better. Even better, is that for 10 rows it is only 1976 bytes, and for 100 it is only 19,616 bytes. Definitely going in the right direction.
Then, I encoded the same DataFrame
as an RDD[Array[Byte]]
where each Array[Byte]
is a binary-encoded Avro
record, with the same schema as the underlying DataFrame
. Then I do:
SizeEstimator.estimate(encodedRdd.take(1))
// res23: Long = 72
72 bytes -- even better! And, for 100 rows, it's 5,216 bytes -- about 52 bytes a row, and it keeps going down from there (48,656 bytes for 1,000 records).
So, at it's best, Row
object weighs 850k per Row
, whereas a binary Avro
record of the same data is about 50 bytes.
What is going on??
Actually Row
by itself is not that big. That is why you don't see a significant change in as size when you take more rows. Problem seems to be schema information:
When you collect data you actually get GenericRowWithSchema
val df = Seq((1, "foo"), (2, "bar")).toDF
df.first.getClass
// res12: Class[_ <: org.apache.spark.sql.Row] =
// class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
GenericRowWithSchema
carries schema information from schema
argument:
class GenericRowWithSchema(values: Array[Any],
override val schema: StructType)
Lets confirm this is really the source of the problem:
import com.madhukaraphatak.sizeof.SizeEstimator
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val rowWithSchema = df.first
val rowWithoutSchema = new GenericRowWithSchema(
rowWithSchema.toSeq.toArray, null)
SizeEstimator.estimate(rowWithSchema)
// Long = 1444255708
SizeEstimator.estimate(rowWithoutSchema)
// Long = 120
Hypothesis: Estimated size you see includes a size of the schema:
SizeEstimator.estimate(df.schema)
// Long = 1444361928
which is roughly the same order of magnitude as collected rows. Lets create a new schema from scratch:
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("_1",IntegerType,false),
StructField("_2",StringType,true)))
val anotherRowWithSchema = new GenericRowWithSchema(
Array(0, "foo"), schema)
SizeEstimator.estimate(anotherRowWithSchema)
// Long = 1444905324
So as you can see the results are consistent.
Why schema is so large? Hard to say. When you take a look at the code you'll see that StructType
is a complex class even excluding its companion object not a simple schema definition.
It doesn't explain reported size though. I suspect it could be some fluke in the SizeEstimator
but I am not sure yet.
You can further isolate the problem but estimation a size of a single StructField
:
import org.apache.spark.sql.types._
import com.madhukaraphatak.sizeof.SizeEstimator
object App {
def main(args: Array[String]) {
val schema = StructField("foo", IntegerType, true)
println(SizeEstimator.estimate(schema))
// 271872172
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With