I'm trying to stream CSV files from a folder on my local machine (OSX). I'm using SparkSession and StreamingContext together like so:
val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))
val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")
If I run ssc.start()
after this, I get this error:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
Instead if I try to start the SparkSession
like this:
inputDF.writeStream.format("console").start()
I get:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
Clearly I'm not understanding how SparkSession
and StreamingContext
should work together. If I get rid of SparkSession
, StreamingContext
only has textFileStream
on which I need to impose a CSV schema. Would appreciate any clarifications on how to get this working.
You cannot have a spark session and spark context together. With the release of Spark 2.0.0 there is a new abstraction available to developers - the Spark Session - which can be instantiated and called upon just like the Spark Context that was previously available.
You can still access spark context from the spark session builder:
val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
val sc = sparkSess.sparkContext
val ssc = new StreamingContext(sc, Seconds(time))
One more thing that is causing your job to fail is you are performing the transformation and no action is called. Some action should be called in the end such as inputDF.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With