Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop spark streaming when the data source has run out

I have a spark streaming job that read from Kafka every 5 seconds, does some transformation on incoming data, and then writes to the file system.

This doesn't really need to be a streaming job, and really, I just want to run it once a day to drain the messages onto the file system. I'm not sure how to stop the job though.

If I pass a timeout to the streamingContext.awaitTermination, it doesn't stop the process, all it does is cause the process to spawn errors when it comes time to iterate on the stream (see error below)

What is the best way to accomplish what I'm trying to do

this is for Spark 1.6 on Python

EDIT:

thanks to @marios the solution was this:

ssc.start()
ssc.awaitTermination(10)
ssc.stop()

that runs the script for ten seconds before stopping.

simplified code:

conf = SparkConf().setAppName("Vehicle Data Consolidator").set('spark.files.overwrite','true')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
stream = KafkaUtils.createStream(
    ssc,
    kafkaParams["zookeeper.connect"],
    "vehicle-data-importer",
    topicPartitions,
    kafkaParams)

stream.saveAsTextFiles('stream-output/kafka-vehicle-data')

ssc.start()
ssc.awaitTermination(10)

error:

16/01/29 15:05:44 INFO BlockManagerInfo: Added input-0-1454097944200 in memory on localhost:58960 (size: 3.0 MB, free: 48.1 MB)
16/01/29 15:05:44 WARN BlockManager: Block input-0-1454097944200 replicated to only 0 peer(s) instead of 1 peers
16/01/29 15:05:44 INFO BlockGenerator: Pushed block input-0-1454097944200
16/01/29 15:05:45 ERROR JobScheduler: Error generating jobs for time 1454097945000 ms
py4j.Py4JException: Cannot obtain a new communication channel
    at py4j.CallbackClient.sendCommand(CallbackClient.java:232)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:111)
    at com.sun.proxy.$Proxy14.call(Unknown Source)
    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:230)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/29 15:05:45 INFO MemoryStore: Block input-0-1454097944800 stored as bytes in memory (estimated size 3.0 MB, free 466.1 MB)
16/01/29 15:05:45 INFO BlockManagerInfo: Added input-0-1454097944800 in memory on localhost:58960 (size: 3.0 MB, free: 45.1 MB)
like image 926
lostinplace Avatar asked Jan 29 '16 20:01

lostinplace


People also ask

How do I stop a Spark from streaming?

Kill application running on client mode In client mode, your application (Spark Driver) runs on a server where you issue Spark-submit command. In this mode to stop your application just type Ctrl-c to stop. This will exit from the application and prompt your command mode.

Does Spark streaming programs typically run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

What sources can the data in Spark streaming come from?

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark's core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.


Video Answer


2 Answers

It seems that the right method to call is awaitTerminationOrTimeout(self, timeout).

I am not sure if it also stops the streaming context. So maybe you can call a ssc.stop() right after the timeout ends.

ssc.start()
ssc.awaitTerminationOrTimeout(10)
ssc.stop()

Note: Take a look here for a similar question.

like image 87
marios Avatar answered Oct 12 '22 23:10

marios


Have a try Kafka "consumer.timeout.ms" parameter, which will gracefully end KafkaReceiver.(from kafka 0.8 configuration)

Throw a timeout exception to the consumer if no message is available for consumption after the specified interval

HDF = KafkaUtils.createStream(ssc, topics={strLoc : 1}, kafkaParams={"consumer.timeout.ms":"20000" }, zkQuorum='xxx:2181', groupId='xxx-consumer-group')

You will not able to receive any new kafka messages in current streaming execution and always get empty RDDs.
And check the count of empty RDDs in DSteam.foreachRDD(func). Terminate streaming exectuion if you continurously get empty RDDs.

like image 39
Shawn Guo Avatar answered Oct 13 '22 01:10

Shawn Guo