Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compute percentiles in Apache Spark

Tags:

apache-spark

I have an rdd of integers (i.e. RDD[Int]) and what I would like to do is to compute the following ten percentiles: [0th, 10th, 20th, ..., 90th, 100th]. What is the most efficient way to do that?

like image 359
user706838 Avatar asked Mar 02 '15 08:03

user706838


People also ask

How are Pyspark percentiles calculated?

In order to calculate the percentile rank of the column in pyspark we use percent_rank() Function. percent_rank() function along with partitionBy() of other column calculates the percentile Rank of the column by group.

What is the formula for calculating percentiles?

Percentiles can be calculated using the formula n = (P/100) x N, where P = percentile, N = number of values in a data set (sorted from smallest to largest), and n = ordinal rank of a given value. Percentiles are frequently used to understand test scores and biometric measurements.

How do you find 25% percentile?

Example question: Find out where the 25th percentile is in the above list. Rank = 25 / 100 * (8 + 1) = 0.25 * 9 = 2.25. A rank of 2.25 is at the 25th percentile.


3 Answers

You can :

  1. Sort the dataset via rdd.sortBy()
  2. Compute the size of the dataset via rdd.count()
  3. Zip with index to facilitate percentile retrieval
  4. Retrieve the desired percentile via rdd.lookup() e.g. for 10th percentile rdd.lookup(0.1 * size)

To compute the median and the 99th percentile: getPercentiles(rdd, new double[]{0.5, 0.99}, size, numPartitions);

In Java 8:

public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) {     double[] values = new double[percentiles.length];      JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions);     JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap());      for (int i = 0; i < percentiles.length; i++) {         double percentile = percentiles[i];         long id = (long) (rddSize * percentile);         values[i] = indexed.lookup(id).get(0);     }      return values; } 

Note that this requires sorting the dataset, O(n.log(n)) and can be expensive on large datasets.

The other answer suggesting simply computing a histogram would not compute correctly the percentile: here is a counter example: a dataset composed of 100 numbers, 99 numbers being 0, and one number being 1. You end up with all the 99 0's in the first bin, and the 1 in the last bin, with 8 empty bins in the middle.

like image 189
Julien Avatar answered Oct 04 '22 02:10

Julien


How about t-digest?

https://github.com/tdunning/t-digest

A new data structure for accurate on-line accumulation of rank-based statistics such as quantiles and trimmed means. The t-digest algorithm is also very parallel friendly making it useful in map-reduce and parallel streaming applications.

The t-digest construction algorithm uses a variant of 1-dimensional k-means clustering to product a data structure that is related to the Q-digest. This t-digest data structure can be used to estimate quantiles or compute other rank statistics. The advantage of the t-digest over the Q-digest is that the t-digest can handle floating point values while the Q-digest is limited to integers. With small changes, the t-digest can handle any values from any ordered set that has something akin to a mean. The accuracy of quantile estimates produced by t-digests can be orders of magnitude more accurate than those produced by Q-digests in spite of the fact that t-digests are more compact when stored on disk.

In summary, the particularly interesting characteristics of the t-digest are that it

  • has smaller summaries than Q-digest
  • works on doubles as well as integers.
  • provides part per million accuracy for extreme quantiles and typically <1000 ppm accuracy for middle quantiles
  • is fast
  • is very simple
  • has a reference implementation that has > 90% test coverage
  • can be used with map-reduce very easily because digests can be merged

It should be fairly easy to use the reference Java implementation from Spark.

like image 22
pauldoo Avatar answered Oct 04 '22 02:10

pauldoo


I discovered this gist

https://gist.github.com/felixcheung/92ae74bc349ea83a9e29

that contains the following function:

  /**
   * compute percentile from an unsorted Spark RDD
   * @param data: input data set of Long integers
   * @param tile: percentile to compute (eg. 85 percentile)
   * @return value of input data at the specified percentile
   */
  def computePercentile(data: RDD[Long], tile: Double): Double = {
    // NIST method; data to be sorted in ascending order
    val r = data.sortBy(x => x)
    val c = r.count()
    if (c == 1) r.first()
    else {
      val n = (tile / 100d) * (c + 1d)
      val k = math.floor(n).toLong
      val d = n - k
      if (k <= 0) r.first()
      else {
        val index = r.zipWithIndex().map(_.swap)
        val last = c
        if (k >= c) {
          index.lookup(last - 1).head
        } else {
          index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
        }
      }
    }
  }
like image 25
Metropolis Avatar answered Oct 04 '22 04:10

Metropolis