Im trying to run Spark Structured Streaming job and save checkpoint to Google Storage, I have a couple of jobs, one w/o aggregation works perfectly, but second with aggregations throw exception. I found that someone have similar issues with checkpointing on S3 because S3 doesn't support read after write semantics https://blog.yuvalitzchakov.com/improving-spark-streaming-checkpoint-performance-with-aws-efs/, but GS does and everything should be ok, I will be glad if anybody will share their experience with checkpointing.
val writeToKafka = stream.writeStream
.format("kafka")
.trigger(ProcessingTime(5000))
.option("kafka.bootstrap.servers", "localhost:29092")
.option("topic", "test_topic")
.option("checkpointLocation", "gs://test/check_test/Job1")
.start()
Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 3402a8361b734732
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 0 (task 1, attempt 0stage 1.0)
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Writing atomically to gs://test/check_test/Job1/state/0/0/1.delta using temp file gs://test/check_test/Job1/state/0/0/.1.delta.8a93d644-0d8e-4cb9-82b5-6418b9e63ffd.TID1.tmp
[Executor task launch worker for task 1] ERROR org.apache.spark.TaskContextImpl - Error in TaskCompletionListener
java.lang.NullPointerException
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.write(GoogleHadoopOutputStream.java:114)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:261)
at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:274)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$cancelDeltaFile(HDFSBackedStateStoreProvider.scala:508)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.abort(HDFSBackedStateStoreProvider.scala:150)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:65)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1$$anonfun$apply$1.apply(package.scala:64)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Executor task launch worker for task 1] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[task-result-getter-1] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[task-result-getter-1] ERROR org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 1.0 failed 1 times; aborting job
[task-result-getter-1] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 1.0, whose tasks have all completed, from pool
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage 1
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Killing all running tasks in stage 1: Stage cancelled
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 1 (start at Job1.scala:53) failed in 9.863 s due to Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
[stream execution thread for [id = f130d772-fc9e-4b0f-a81e-942af0741ae9, runId = 7dc1cb33-c5f2-4ebe-8707-251de2503ee1]] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 failed: start at Job1.scala:53, took 20.926657 s
[stream execution thread for [id = f130d772-fc9e-4b0f-a81e-942af0741ae9, runId = 7dc1cb33-c5f2-4ebe-8707-251de2503ee1]] ERROR org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@228cec9e is aborting.
[stream execution thread for [id = f130d772-fc9e-4b0f-a81e-942af0741ae9, runId = 7dc1cb33-c5f2-4ebe-8707-251de2503ee1]] ERROR org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@228cec9e aborted.
[stream execution thread for [id = f130d772-fc9e-4b0f-a81e-942af0741ae9, runId = 7dc1cb33-c5f2-4ebe-8707-251de2503ee1]] ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query [id = f130d772-fc9e-4b0f-a81e-942af0741ae9, runId = 7dc1cb33-c5f2-4ebe-8707-251de2503ee1] terminated with error
org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
... 35 more
Caused by: org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = f130d772-fc9e-4b0f-a81e-942af0741ae9, runId = 7dc1cb33-c5f2-4ebe-8707-251de2503ee1]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[NormalizedEvents]]: {"NormalizedEvents":{"0":46564}}}
Current State: ACTIVE
Thread State: RUNNABLE
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Writing job aborted.
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
... 35 more
Caused by: org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:137)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Thread-1] INFO org.apache.spark.SparkContext - Invoking stop() from shutdown hook
[Thread-1] INFO org.spark_project.jetty.server.AbstractConnector - Stopped Spark@1ce93c18{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
[Thread-1] INFO org.apache.spark.ui.SparkUI - Stopped Spark web UI at http://10.25.12.222:4041
[dispatcher-event-loop-0] INFO org.apache.spark.MapOutputTrackerMasterEndpoint - MapOutputTrackerMasterEndpoint stopped!
[Thread-1] INFO org.apache.spark.storage.memory.MemoryStore - MemoryStore cleared
[Thread-1] INFO org.apache.spark.storage.BlockManager - BlockManager stopped
[Thread-1] INFO org.apache.spark.storage.BlockManagerMaster - BlockManagerMaster stopped
[dispatcher-event-loop-1] INFO org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint - OutputCommitCoordinator stopped!
[Thread-1] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext
[Thread-1] INFO org.apache.spark.util.ShutdownHookManager - Shutdown hook called
[Thread-1] INFO org.apache.spark.util.ShutdownHookManager - Deleting directory /private/var/folders/_t/7m21x7313gs74_yfv4txsr69b8yh87/T/temporaryReader-75fdf46f-7de0-4ca7-9c77-8bd034e4f5a3
[Thread-1] INFO org.apache.spark.util.ShutdownHookManager - Deleting directory /private/var/folders/_t/7m21x7313gs74_yfv4txsr69b8yh87/T/spark-bde783f1-fa66-420f-87e7-5c1895ab7ccc
A checkpoint helps build fault-tolerant and resilient Spark applications. In Spark Structured Streaming, it maintains intermediate state on HDFS compatible file systems to recover from failures. To specify the checkpoint in a streaming query, we use the checkpointLocation parameter.
If by "gracefully" you mean that the streaming query should complete processing of data, then void stop() will not do that. It will just wait until the threads performing execution has stopped (as mentioned in the documentation).
There are two types of Apache Spark checkpointing: Reliable Checkpointing – It refers to that checkpointing in which the actual RDD is saved in reliable distributed file system, e.g. HDFS. To set the checkpoint directory call: SparkContext. setCheckpointDir(directory: String).
Spark Streaming jobs checkpointing to Google Cloud Storage was fixed. This fix will be included in GCS connector 2.1.4 and 2.2.0 releases.
You cannot use GCS as checkpoint store if you make aggregations in your stream, at least in version 2.1.3 (hadoop 2). It's perfectly fine if your transforms doesn't include any groupBy, but if that's the case, you should save your checkpoints in HDFS or something else.
I got the same issue trying to write a stream to GCS in Spark 2.4.4. There is no problem using GCS as writestream, but i got same null pointer exception when using GCS as checkpoint location. As I am running spark over Google Dataproc, i can use dataproc HDFS capabilities of the nodes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With