Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Analytic/Window Functions in Spark Java?

I'm trying to use analytical/window function last_value in Spark Java.

Netezza Query:

select sno, name, addr1, addr2, run_dt, 
last_value(addr1 ignore nulls) over (partition by sno, name, addr1, addr2, run_dt order by beg_ts , end_ts rows between unbounded preceding and unbounded following  ) as last_addr1
from daily

We want to implement this query n Spark Java (Without using HiveSQLContext):

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.execution.WindowFunctionFrame;

    SparkConf conf = new SparkConf().setMaster("local").setAppName("Agg");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);


    JavaRDD<Stgdailydtl> daily = sc.textFile("C:\\Testing.txt").map(
              new Function<String, Stgdailydtl>() {
                  private static final long serialVersionUID = 1L;
                public Stgdailydtl call(String line) throws Exception {
                  String[] parts = line.split(",");

                  Stgdailydtl daily = new Stgdailydtl();
                  daily.setSno(Integer.parseInt(parts[0].trim()));
                  .....

                  return daily;
                }
              });
DataFrame schemaDailydtl = sqlContext.createDataFrame(daily, Stgdailydtl.class);
schemaDailydtl.registerTempTable("daily");
WindowSpec ws = Window.partitionBy("sno, name, addr1, addr2, run_dt").orderBy("beg_ts , end_ts").rowsBetween(0, 100000);
DataFrame df = sqlContext.sql("select sno, name, addr1, addr2, run_dt "
            + "row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from daily ");

}

}

Error:

Exception in thread "main" java.lang.RuntimeException: [1.110] failure: ``union'' expected but `(' found

select stg.mach_id, stg.msrmt_gbl_id, stg.msrmt_dsc, stg.elmt_dsc, stg.elmt_dsc_grp_concat, row_number() over(partition by mach_id, msrmt_gbl_id, msrmt_dsc, elmt_dsc, end_cptr_dt order by beg_cptr_ts, end_cptr_ts) from stgdailydtl stg 
                                                                                                             ^
    at scala.sys.package$.error(package.scala:27)

I could not understand how to use WindowSpec/Window object. Please suggest on this. Thanks for your help

like image 732
ND User Avatar asked Oct 24 '15 14:10

ND User


People also ask

What is window unboundedPreceding?

unboundedPreceding , or any value less than or equal to -9223372036854775808. endint. boundary end, inclusive. The frame is unbounded if this is Window. unboundedFollowing , or any value greater than or equal to 9223372036854775807.

What is rank in spark?

RANK in Spark calculates the rank of a value in a group of values. It returns one plus the number of rows proceeding or equals to the current row in the ordering of a partition.

What is lead and lag in spark SQL?

lead returns the value that is offset records after the current records, and defaultValue if there is less than offset records after the current record. lag returns null value if the number of records in a window partition is less than offset or defaultValue .

What are analytical functions windowing functions?

A window function, also known as an analytic function, computes values over a group of rows and returns a single result for each row. This is different from an aggregate function, which returns a single result for a group of rows.


1 Answers

You're mixing dataframe syntax and sql syntax - specifically you created a WindowSpec but then didn't use it.

Import org.apache.spark.sql.functions to get the row_number function, then create the column that you're trying to select:

Column rowNum = functions.row_number().over(ws)

Then select it using the dataframe API:

df.select(each, column, you, want, rowNum)

My syntax may be slightly off, I'm used to scala or python, but the gist is something like that.

like image 109
Tristan Reid Avatar answered Oct 18 '22 02:10

Tristan Reid