I have a spark streaming job that read from Kafka every 5 seconds, does some transformation on incoming data, and then writes to the file system.
This doesn't really need to be a streaming job, and really, I just want to run it once a day to drain the messages onto the file system. I'm not sure how to stop the job though.
If I pass a timeout to the streamingContext.awaitTermination, it doesn't stop the process, all it does is cause the process to spawn errors when it comes time to iterate on the stream (see error below)
What is the best way to accomplish what I'm trying to do
this is for Spark 1.6 on Python
EDIT:
thanks to @marios the solution was this:
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
that runs the script for ten seconds before stopping.
simplified code:
conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
ssc,
kafkaParams["zookeeper.connect"],
"vehicle-data-importer",
topicPartitions,
kafkaParams)
stream.saveAsTextFiles('stream-output/kafka-vehicle-data')
ssc.start()
ssc.awaitTermination(10)
error:
16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
at com.sun.proxy.$Proxy14.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
Kill application running on client mode In client mode, your application (Spark Driver) runs on a server where you issue Spark-submit command. In this mode to stop your application just type Ctrl-c to stop. This will exit from the application and prompt your command mode.
Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).
Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark's core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.
exactly once semantics are only possible if the source is re-playable and the sink is idempotent.
It seems that the right method to call is awaitTerminationOrTimeout(self, timeout).
I am not sure if it also stops the streaming context. So maybe you can call a ssc.stop() right after the timeout ends.
ssc.start()
ssc.awaitTerminationOrTimeout(10)
ssc.stop()
Note: Take a look here for a similar question.
Have a try Kafka "consumer.timeout.ms" parameter, which will gracefully end KafkaReceiver.(from kafka 0.8 configuration)
Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
HDF = KafkaUtils.createStream(ssc, topics={strLoc : 1}, kafkaParams={"consumer.timeout.ms":"20000" }, zkQuorum='xxx:2181', groupId='xxx-consumer-group')
You will not able to receive any new kafka messages in current streaming execution and always get empty RDDs.
And check the count of empty RDDs in DSteam.foreachRDD(func). Terminate streaming exectuion if you continurously get empty RDDs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With