Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why filter does not preserve partitioning?

This is a quote from jaceklaskowski.gitbooks.io.

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning. map, flatMap, filter operations apply a function to every partition.

I don't understand why filter does not preserve partitioning. It's just getting a subset of each partition which satisfy a condition so I think partitions can be preserved. Why isn't it like that?

like image 593
Hoori M. Avatar asked May 11 '18 08:05

Hoori M.


2 Answers

You are of course right. The quote is just incorrect. filter does preserve partitioning (for the reason you've already described), and it is trivial to confirm that

val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
  new org.apache.spark.HashPartitioner(11)
)

rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

rdd.partitioner == filteredRDD.partitioner
// Boolean = true

This stays in contrast to operations like map, which don't preserver partitioning (Partitioner):

rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None

Datasets are a bit more subtle, as filters are normally pushed-down, but overall the behavior is similar.

like image 95
Alper t. Turker Avatar answered Nov 19 '22 00:11

Alper t. Turker


Filter does preserve partitioning, at least this is suggested by the source-code of filter ( preservesPartitioning = true):

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }
like image 4
Raphael Roth Avatar answered Nov 18 '22 23:11

Raphael Roth