Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to find median and quantiles using Spark

How can I find median of an RDD of integers using a distributed method, IPython, and Spark? The RDD is approximately 700,000 elements and therefore too large to collect and find the median.

This question is similar to this question. However, the answer to the question is using Scala, which I do not know.

How can I calculate exact median with Apache Spark?

Using the thinking for the Scala answer, I am trying to write a similar answer in Python.

I know I first want to sort the RDD. I do not know how. I see the sortBy (Sorts this RDD by the given keyfunc) and sortByKey (Sorts this RDD, which is assumed to consist of (key, value) pairs.) methods. I think both use key value and my RDD only has integer elements.

  1. First, I was thinking of doing myrdd.sortBy(lambda x: x)?
  2. Next I will find the length of the rdd (rdd.count()).
  3. Finally, I want to find the element or 2 elements at the center of the rdd. I need help with this method too.

EDIT:

I had an idea. Maybe I can index my RDD and then key = index and value = element. And then I can try to sort by value? I don't know if this is possible because there is only a sortByKey method.

like image 567
pr338 Avatar asked Jul 15 '15 14:07

pr338


People also ask

How do you find the median in PySpark in SQL?

We can define our own UDF in PySpark, and then we can use the python library np. The numpy has the method that calculates the median of a data frame. The data frame column is first grouped by based on a column value and post grouping the column whose median needs to be calculated in collected as a list of Array.

How do you find the median in SQL?

To get the median we have to use PERCENTILE_CONT(0.5). If you want to define a specific set of rows grouped to get the median, then use the OVER (PARTITION BY) clause. Here I've used PARTITION BY on the column OrderID so as to find the median of unit prices for the order ids.

How are PySpark percentiles calculated?

In order to calculate the percentile rank of the column in pyspark we use percent_rank() Function. percent_rank() function along with partitionBy() of other column calculates the percentile Rank of the column by group.


1 Answers

Ongoing work

SPARK-30569 - Add DSL functions invoking percentile_approx

Spark 2.0+:

You can use approxQuantile method which implements Greenwald-Khanna algorithm:

Python:

df.approxQuantile("x", [0.5], 0.25) 

Scala:

df.stat.approxQuantile("x", Array(0.5), 0.25) 

where the last parameter is a relative error. The lower the number the more accurate results and more expensive computation.

Since Spark 2.2 (SPARK-14352) it supports estimation on multiple columns:

df.approxQuantile(["x", "y", "z"], [0.5], 0.25) 

and

df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25) 

Underlying methods can be also used in SQL aggregation (both global and groped) using approx_percentile function:

> SELECT approx_percentile(10.0, array(0.5, 0.4, 0.1), 100);  [10.0,10.0,10.0] > SELECT approx_percentile(10.0, 0.5, 100);  10.0 

Spark < 2.0

Python

As I've mentioned in the comments it is most likely not worth all the fuss. If data is relatively small like in your case then simply collect and compute median locally:

import numpy as np  np.random.seed(323) rdd = sc.parallelize(np.random.randint(1000000, size=700000))  %time np.median(rdd.collect()) np.array(rdd.collect()).nbytes 

It takes around 0.01 second on my few years old computer and around 5.5MB of memory.

If data is much larger sorting will be a limiting factor so instead of getting an exact value it is probably better to sample, collect, and compute locally. But if you really want a to use Spark something like this should do the trick (if I didn't mess up anything):

from numpy import floor import time  def quantile(rdd, p, sample=None, seed=None):     """Compute a quantile of order p ∈ [0, 1]     :rdd a numeric rdd     :p quantile(between 0 and 1)     :sample fraction of and rdd to use. If not provided we use a whole dataset     :seed random number generator seed to be used with sample     """     assert 0 <= p <= 1     assert sample is None or 0 < sample <= 1      seed = seed if seed is not None else time.time()     rdd = rdd if sample is None else rdd.sample(False, sample, seed)      rddSortedWithIndex = (rdd.         sortBy(lambda x: x).         zipWithIndex().         map(lambda (x, i): (i, x)).         cache())      n = rddSortedWithIndex.count()     h = (n - 1) * p      rddX, rddXPlusOne = (         rddSortedWithIndex.lookup(x)[0]         for x in int(floor(h)) + np.array([0L, 1L]))      return rddX + (h - floor(h)) * (rddXPlusOne - rddX) 

And some tests:

np.median(rdd.collect()), quantile(rdd, 0.5) ## (500184.5, 500184.5) np.percentile(rdd.collect(), 25), quantile(rdd, 0.25) ## (250506.75, 250506.75) np.percentile(rdd.collect(), 75), quantile(rdd, 0.75) (750069.25, 750069.25) 

Finally lets define median:

from functools import partial median = partial(quantile, p=0.5) 

So far so good but it takes 4.66 s in a local mode without any network communication. There is probably way to improve this, but why even bother?

Language independent (Hive UDAF):

If you use HiveContext you can also use Hive UDAFs. With integral values:

rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df")  sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df") 

With continuous values:

sqlContext.sql("SELECT percentile(x, 0.5) FROM df") 

In percentile_approx you can pass an additional argument which determines a number of records to use.

like image 193
zero323 Avatar answered Oct 13 '22 06:10

zero323