I have an rdd of integers (i.e. RDD[Int]
) and what I would like to do is to compute the following ten percentiles: [0th, 10th, 20th, ..., 90th, 100th]
. What is the most efficient way to do that?
In order to calculate the percentile rank of the column in pyspark we use percent_rank() Function. percent_rank() function along with partitionBy() of other column calculates the percentile Rank of the column by group.
Percentiles can be calculated using the formula n = (P/100) x N, where P = percentile, N = number of values in a data set (sorted from smallest to largest), and n = ordinal rank of a given value. Percentiles are frequently used to understand test scores and biometric measurements.
Example question: Find out where the 25th percentile is in the above list. Rank = 25 / 100 * (8 + 1) = 0.25 * 9 = 2.25. A rank of 2.25 is at the 25th percentile.
You can :
To compute the median and the 99th percentile: getPercentiles(rdd, new double[]{0.5, 0.99}, size, numPartitions);
In Java 8:
public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) { double[] values = new double[percentiles.length]; JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions); JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap()); for (int i = 0; i < percentiles.length; i++) { double percentile = percentiles[i]; long id = (long) (rddSize * percentile); values[i] = indexed.lookup(id).get(0); } return values; }
Note that this requires sorting the dataset, O(n.log(n)) and can be expensive on large datasets.
The other answer suggesting simply computing a histogram would not compute correctly the percentile: here is a counter example: a dataset composed of 100 numbers, 99 numbers being 0, and one number being 1. You end up with all the 99 0's in the first bin, and the 1 in the last bin, with 8 empty bins in the middle.
How about t-digest?
https://github.com/tdunning/t-digest
A new data structure for accurate on-line accumulation of rank-based statistics such as quantiles and trimmed means. The t-digest algorithm is also very parallel friendly making it useful in map-reduce and parallel streaming applications.
The t-digest construction algorithm uses a variant of 1-dimensional k-means clustering to product a data structure that is related to the Q-digest. This t-digest data structure can be used to estimate quantiles or compute other rank statistics. The advantage of the t-digest over the Q-digest is that the t-digest can handle floating point values while the Q-digest is limited to integers. With small changes, the t-digest can handle any values from any ordered set that has something akin to a mean. The accuracy of quantile estimates produced by t-digests can be orders of magnitude more accurate than those produced by Q-digests in spite of the fact that t-digests are more compact when stored on disk.
In summary, the particularly interesting characteristics of the t-digest are that it
- has smaller summaries than Q-digest
- works on doubles as well as integers.
- provides part per million accuracy for extreme quantiles and typically <1000 ppm accuracy for middle quantiles
- is fast
- is very simple
- has a reference implementation that has > 90% test coverage
- can be used with map-reduce very easily because digests can be merged
It should be fairly easy to use the reference Java implementation from Spark.
I discovered this gist
https://gist.github.com/felixcheung/92ae74bc349ea83a9e29
that contains the following function:
/**
* compute percentile from an unsorted Spark RDD
* @param data: input data set of Long integers
* @param tile: percentile to compute (eg. 85 percentile)
* @return value of input data at the specified percentile
*/
def computePercentile(data: RDD[Long], tile: Double): Double = {
// NIST method; data to be sorted in ascending order
val r = data.sortBy(x => x)
val c = r.count()
if (c == 1) r.first()
else {
val n = (tile / 100d) * (c + 1d)
val k = math.floor(n).toLong
val d = n - k
if (k <= 0) r.first()
else {
val index = r.zipWithIndex().map(_.swap)
val last = c
if (k >= c) {
index.lookup(last - 1).head
} else {
index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
}
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With