Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Non-time-based windows are not supported on streaming DataFrames/Datasets;

I need to write Spark sql query with inner select and partition by. Problem is that I have AnalysisException. I already spend few hours on this but with other approach I have no success.

Exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;;
Window [sum(cast(_w0#41 as bigint)) windowspecdefinition(deviceId#28, timestamp#30 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS grp#34L], [deviceId#28], [timestamp#30 ASC NULLS FIRST]
+- Project [currentTemperature#27, deviceId#28, status#29, timestamp#30, wantedTemperature#31, CASE WHEN (status#29 = cast(false as boolean)) THEN 1 ELSE 0 END AS _w0#41]

I assume that this is too complicated query to implement like this. But i don't know to to fix it.

 SparkSession spark = SparkUtils.getSparkSession("RawModel");

 Dataset<RawModel> datasetMap = readFromKafka(spark);

 datasetMap.registerTempTable("test");

 Dataset<Row> res = datasetMap.sqlContext().sql("" +
                " select deviceId, grp, avg(currentTemperature) as averageT, min(timestamp) as minTime ,max(timestamp) as maxTime, count(*) as countFrame " +
                " from (select test.*,  sum(case when status = 'false' then 1 else 0 end) over (partition by deviceId order by timestamp) as grp " +
                "  from test " +
                "  ) test " +
                " group by deviceid, grp ");

Any suggestion would be very appreciated. Thank you.

like image 943
Raskolnikov Avatar asked Nov 14 '18 07:11

Raskolnikov


People also ask

Can I use Spark for Streaming data?

In fact, you can apply Spark's machine learning and graph processing algorithms on data streams. Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

What is data Streaming How Spark Streaming support data Streaming?

Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.

Which of the library supports real-time Streaming in Spark?

Kinesis: Spark Streaming 3.3. 0 is compatible with Kinesis Client Library 1.2. 1. See the Kinesis Integration Guide for more details.

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.


Video Answer


1 Answers

I believe the issue is in the windowing specification:

over (partition by deviceId order by timestamp) 

The partition would need to be over a time based column - in your case timestamp . The following should work:

over (partition by timestamp order by timestamp) 

That will not of course address the intent of your query. The following might be attempted: but it is unclear whether spark would support it:

over (partition by timestamp, deviceId order by timestamp) 

Even if spark does support that it would still change the semantics of your query.

Update

Here is a definitive source: from Tathagata Das who is a key/core committer on spark streaming: http://apache-spark-user-list.1001560.n3.nabble.com/Does-partition-by-and-order-by-works-only-in-stateful-case-td31816.html

enter image description here

like image 164
WestCoastProjects Avatar answered Oct 24 '22 17:10

WestCoastProjects