import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import sqlContext.implicits._
val ehParams = Map[String, String](
"eventhubs.policyname" -> "Full",
...
)
val ssc = new StreamingContext(sc, Seconds(2))
val stream = EventHubsUtils.createUnionStream(ssc, ehParams)
val cr = stream.window(Seconds(6))
case class Message(msg: String)
stream.map(msg=>Message(new String(msg))).foreachRDD(rdd=>rdd.toDF().registerTempTable("temp"))
stream.print
ssc.start
This above starts and runs fine but I cannot seem to stop it. Any call to %sql show tables will just freeze.
How do i stop the StreamingContext above ?
ssc.stop
also kills the Spark Context, requiring an interpreter restart.
Use ssc.stop(stopSparkContext=false, stopGracefully=true)
instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With