I have 2 streaming query like bellow
val streamingQuery = injectableDependencies.writeStreamDfToDeltaFile(validDF, dataPath, checkPointPath, configuration.pollingTimerSeconds, queryName, configuration.outputMode, configuration.partitionBy)
//streamingQuery for Invalid tables
val streamingQueryInvalid = injectableDependencies.writeStreamDfToDeltaFile(inValidDF, dataPathInvalid, checkPointPath, configuration.pollingTimerSeconds, queryNameInvalid, configuration.outputMode, configuration.partitionBy)
//creating valid tables
injectableDependencies.createTableOverDeltaFile(configuration.tableName, configuration.databaseName, dataPath, streamingQuery)
//creating invalid tables
injectableDependencies.createTableOverDeltaFile("Invalid"+configuration.tableName, configuration.databaseName, dataPath, streamingQueryInvalid)
below i am trying to write streams
df
.writeStream
.format("DELTA")
.option("path", dataPath)
.option("checkpointLocation", checkPointPath)
.partitionBy(partitionBy.getOrElse(List[String]()): _*)
.outputMode(outputMode)
.trigger(Trigger.ProcessingTime(pollingTimerSeconds.seconds))
.queryName(queryName)
.start()
i am getting bellow error
ERROR Uncaught throwable from user code: java.lang.IllegalStateException: Cannot start query with id 74e6b948-bc55-419c-af42-34ef7ea015ba as another query with same id is already active. Perhaps you are attempting to restart a query from the checkpoint that is already active.
Can anybody please suggest what going wrong
I had the same issue and solved it by ensuring that each streaming query gets its dedicated checkpointLocation as option in the writeStream call.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With