Using Scala and Spark, I have the following construction:
val rdd1: RDD[String] = ...
val rdd2: RDD[(String, Any)] = ...
val rdd1pairs = rdd1.map(s => (s, s))
val result = rdd2.join(rdd1pairs)
.map { case (_: String, (e: Any, _)) => e }
The purpose of mapping rdd1
into a PairRDD
is the join with rdd2
in the subsequent step. However, I am actually only interested in the values of rdd2
, hence the mapping step in the last line which omits the keys. Actually, this is an intersection between rdd2
and rdd1
performed with Spark's join()
for efficiency reasons.
My question refers to the keys of rdd1pairs
: they are created for syntactical reasons only (to allow the join) in the first map step and are later discarded without any usage. How does the compiler handle this? Does it matter in terms of memory consumption whether I use the String s
(as shown in the example)? Should I replace it by null
or 0
to save a little memory? Does the compiler actually create and store these objects (references) or does it notice that they are never used?
In this case, it is what the Spark driver will do that influences the outcome rather than the compiler, I think. Whether or not Spark can optimise its execution pipeline in order to avoid creating the redundant duplication of s
. I'm not sure but I think Spark will create the rdd1pairs
, in memory.
Instead of mapping to (String, String)
you could use (String, Unit)
:
rdd1.map(s => (s,()))
What you're doing is basically a filter of rdd2
based on rdd1
. If rdd1 is significantly smaller than rdd2, another method would be to represent the data of rdd1
as a broadcast variable rather than an RDD, and simply filter rdd2
. This avoids any shuffling or reduce phase, so may be quicker, but will only work if the data of rdd1
is small enough to fit on each node.
EDIT:
Considering how using Unit rather than String saves space, consider the following examples:
object size extends App {
(1 to 1000000).map(i => ("foo"+i, ()))
val input = readLine("prompt> ")
}
and
object size extends App {
(1 to 1000000).map(i => ("foo"+i, "foo"+i))
val input = readLine("prompt> ")
}
Using the jstat
command as described in this question How to check heap usage of a running JVM from the command line? the first version uses significantly less heap than the latter.
Edit 2:
Unit
is effectively a singleton object with no contents, so logically, it should not require any serialization. The fact that the type definition contains Unit
tells you all you need to be able to deserialize a structure which has a field of type Unit.
Spark uses Java Serialization by default. Consider the following:
object Main extends App {
import java.io.{ObjectOutputStream, FileOutputStream}
case class Foo (a: String, b:String)
case class Bar (a: String, b:String, c: Unit)
val str = "abcdef"
val foo = Foo("abcdef", "xyz")
val bar = Bar("abcdef", "xyz", ())
val fos = new FileOutputStream( "foo.obj" )
val fo = new ObjectOutputStream( fos )
val bos = new FileOutputStream( "bar.obj" )
val bo = new ObjectOutputStream( bos )
fo writeObject foo
bo writeObject bar
}
The two files are of identical size:
�� sr Main$Foo3�,�z \ L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
and
�� sr Main$Bar+a!N��b L at Ljava/lang/String;L bq ~ xpt abcdeft xyz
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With