I need to write Spark sql query with inner select and partition by. Problem is that I have AnalysisException. I already spend few hours on this but with other approach I have no success.
Exception:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]
I assume that this is too complicated query to implement like this. But i don't know to to fix it.
SparkSession spark = SparkUtils.getSparkSession("RawModel");
Dataset<RawModel> datasetMap = readFromKafka(spark);
datasetMap.registerTempTable("test");
Dataset<Row> res = datasetMap.sqlContext().sql("" +
" select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
" from (select test.*, sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
" from test " +
" ) test " +
" group by deviceid, grp ");
Any suggestion would be very appreciated. Thank you.
In fact, you can apply Spark's machine learning and graph processing algorithms on data streams. Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.
Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.
Kinesis: Spark Streaming 3.3. 0 is compatible with Kinesis Client Library 1.2. 1. See the Kinesis Integration Guide for more details.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
I believe the issue is in the windowing specification:
over (partition by deviceId order by timestamp)
The partition would need to be over a time based column - in your case timestamp . The following should work:
over (partition by timestamp order by timestamp)
That will not of course address the intent of your query. The following might be attempted: but it is unclear whether spark would support it:
over (partition by timestamp, deviceId order by timestamp)
Even if spark does support that it would still change the semantics of your query.
Update
Here is a definitive source: from Tathagata Das who is a key/core committer on spark streaming: http://apache-spark-user-list.1001560.n3.nabble.com/Does-partition-by-and-order-by-works-only-in-stateful-case-td31816.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With