I have given the sample table. I want to get the median from "value" column for each group "source" column. Where source column is of String DataType value column is of double DataType
scala> sqlContext.sql("SELECT * from tTab order by source").show
+---------------+-----+
| Source|value|
+---------------+-----+
|131.183.222.110| 1.0|
| 131.183.222.85| 1.0|
| 131.183.222.85| 0.0|
| 131.183.222.85| 0.5|
| 131.183.222.85| 1.0|
| 131.183.222.85| 1.0|
| 43.230.146.7| 0.0|
| 43.230.146.7| 1.0|
| 43.230.146.7| 1.0|
| 43.230.146.8| 1.0|
| 43.230.146.8| 1.0|
+---------------+-----+
scala> tTab.printSchema
root
|-- Source: string (nullable = true)
|-- value: double (nullable = true)
Expected Answer:
+---------------+-----+
| Source|value|
+---------------+-----+
|131.183.222.110| 1.0|
| 131.183.222.85| 1.0|
| 43.230.146.7| 1.0|
| 43.230.146.8| 1.0|
+---------------+-----+
If "value" column would have been Int, below query is working. Since "value" is of data type as double, it is giving me error:
sqlContext.sql("SELECT source , percentile(value,0.5) OVER (PARTITION BY source) AS Median from tTab ").show
Error:
org.apache.hadoop.hive.ql.exec.NoMatchingMethodException: No matching method for class org.apache.hadoop.hive.ql.udf.UDAFPercentile with (double, double). Possible choices: _FUNC_(bigint, array<double>) _FUNC_(bigint, double)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getMethodInternal(FunctionRegistry.java:1164)
at org.apache.hadoop.hive.ql.exec.DefaultUDAFEvaluatorResolver.getEvaluatorClass(DefaultUDAFEvaluatorResolver.java:83)
at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge.getEvaluator(GenericUDAFBridge.java:56)
at org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:47)
at org.apache.spark.sql.hive.HiveWindowFunction.evaluator$lzycompute(hiveUDFs.scala:351)
at org.apache.spark.sql.hive.HiveWindowFunction.evaluator(hiveUDFs.scala:349)
at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector$lzycompute(hiveUDFs.scala:357)
at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector(hiveUDFs.scala:356)
at org.apache.spark.sql.hive.HiveWindowFunction.dataType(hiveUDFs.scala:362)
at org.apache.spark.sql.catalyst.expressions.WindowExpression.dataType(windowExpressions.scala:313)
at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:140)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:856)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:852)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:852)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:863)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$addWindow(Analyzer.scala:849)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:957)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:913)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:913)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:745)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC.<init>(<console>:33)
at $iwC$$iwC.<init>(<console>:35)
at $iwC.<init>(<console>:37)
at <init>(<console>:39)
at .<init>(<console>:43)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Thank you so much!
For non integral values you should use percentile_approx
UDF:
import org.apache.spark.mllib.random.RandomRDDs
val df = RandomRDDs.normalRDD(sc, 1000, 10, 1).map(Tuple1(_)).toDF("x")
df.registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df").show
// +--------------------+
// | _c0|
// +--------------------+
// |0.035379710486199915|
// +--------------------+
On a side not you should use GROUP BY
not PARTITION BY
. Latter one is used for window functions and has different effect than you expect.
SELECT source, percentile_approx(value, 0.5) FROM df GROUP BY source
See also How to find median using Spark
Here is how it can be done using Spark Scala dataframe functions. This is based on how Imputer is implemented for median strategy in Spark>=2.2 - https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Imputer.scala -
df.select(colName)
.stat
.approxQuantile(colName, Array(0.5), 0.001) //median
.head
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With