My data is like:
id | val
----------------
a1 | 10
a1 | 20
a2 | 5
a2 | 7
a2 | 2
I am trying to delete row that has MAX(val) in the group if I group on "id".
Result should be like:
id | val
----------------
a1 | 10
a2 | 5
a2 | 2
I am using SPARK DataFrame and SQLContext. I need some way like:
DataFrame df = sqlContext.sql("SELECT * FROM jsontable WHERE (id, val) NOT IN (SELECT is,MAX(val) from jsontable GROUP BY id)");
How can I do that?
You can do that using dataframe operations and Window functions. Assuming you have your data in the dataframe df1
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val maxOnWindow = max(col("val")).over(Window.partitionBy(col("id")))
val df2 = df1
.withColumn("max", maxOnWindow)
.where(col("val") < col("max"))
.select("id", "val")
In Java, the equivalent would be something like:
import org.apache.spark.sql.functions.Window;
import static org.apache.spark.sql.functions.*;
Column maxOnWindow = max(col("val")).over(Window.partitionBy("id"));
DataFrame df2 = df1
.withColumn("max", maxOnWindow)
.where(col("val").lt(col("max")))
.select("id", "val");
Here's a nice article about window functions: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
Below is the Java implementation of Mario's scala code:
DataFrame df = sqlContext.read().json(input);
DataFrame dfMaxRaw = df.groupBy("id").max("val");
DataFrame dfMax = dfMaxRaw.select(
dfMaxRaw.col("id").as("max_id"), dfMaxRaw.col("max(val)").as("max_val")
);
DataFrame combineMaxWithData = df.join(dfMax, df.col("id")
.equalTo(dfMax.col("max_id")));
DataFrame finalResult = combineMaxWithData.filter(
combineMaxWithData.col("id").equalTo(combineMaxWithData.col("max_id"))
.and(combineMaxWithData.col("val").notEqual(combineMaxWithData.col("max_val")))
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With