I'm trying to use analytical/window function last_value in Spark Java.
select sno, name, addr1, addr2, run_dt,
last_value(addr1 ignore nulls) over (partition by sno, name, addr1, addr2, run_dt order by beg_ts , end_ts rows between unbounded preceding and unbounded following ) as last_addr1
from daily
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.WindowFunctionFrame;
SparkConf conf = new SparkConf().setMaster("local").setAppName("Agg");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<Stgdailydtl> daily = sc.textFile("C:\\Testing.txt").map(
new Function<String, Stgdailydtl>() {
private static final long serialVersionUID = 1L;
public Stgdailydtl call(String line) throws Exception {
String[] parts = line.split(",");
Stgdailydtl daily = new Stgdailydtl();
daily.setSno(Integer.parseInt(parts[0].trim()));
.....
return daily;
}
});
DataFrame schemaDailydtl = sqlContext.createDataFrame(daily, Stgdailydtl.class);
schemaDailydtl.registerTempTable("daily");
WindowSpec ws = Window.partitionBy("sno, name, addr1, addr2, run_dt").orderBy("beg_ts , end_ts").rowsBetween(0, 100000);
DataFrame df = sqlContext.sql("select sno, name, addr1, addr2, run_dt "
+ "row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from daily ");
}
}
Exception in thread "main" java.lang.RuntimeException: [1.110] failure: ``union'' expected but `(' found
select stg.mach_id, stg.msrmt_gbl_id, stg.msrmt_dsc, stg.elmt_dsc, stg.elmt_dsc_grp_concat, row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from stgdailydtl stg
^
at scala.sys.package$.error(package.scala:27)
I could not understand how to use WindowSpec/Window object. Please suggest on this. Thanks for your help
unboundedPreceding , or any value less than or equal to -9223372036854775808. endint. boundary end, inclusive. The frame is unbounded if this is Window. unboundedFollowing , or any value greater than or equal to 9223372036854775807.
RANK in Spark calculates the rank of a value in a group of values. It returns one plus the number of rows proceeding or equals to the current row in the ordering of a partition.
lead returns the value that is offset records after the current records, and defaultValue if there is less than offset records after the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue .
A window function, also known as an analytic function, computes values over a group of rows and returns a single result for each row. This is different from an aggregate function, which returns a single result for a group of rows.
You're mixing dataframe syntax and sql syntax - specifically you created a WindowSpec but then didn't use it.
Import org.apache.spark.sql.functions
to get the row_number
function, then create the column that you're trying to select:
Column rowNum = functions.row_number().over(ws)
Then select it using the dataframe API:
df.select(each, column, you, want, rowNum)
My syntax may be slightly off, I'm used to scala or python, but the gist is something like that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With