This is a quote from jaceklaskowski.gitbooks.io.
Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.
I don't understand why filter does not preserve partitioning. It's just getting a subset of each partition which satisfy a condition so I think partitions can be preserved. Why isn't it like that?
You are of course right. The quote is just incorrect. filter
does preserve partitioning (for the reason you've already described), and it is trivial to confirm that
val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
new org.apache.spark.HashPartitioner(11)
)
rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)
rdd.partitioner == filteredRDD.partitioner
// Boolean = true
This stays in contrast to operations like map
, which don't preserver partitioning (Partitioner
):
rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None
Datasets
are a bit more subtle, as filters are normally pushed-down, but overall the behavior is similar.
Filter does preserve partitioning, at least this is suggested by the source-code of filter ( preservesPartitioning = true
):
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With