Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to use SparkSession and StreamingContext together?

I'm trying to stream CSV files from a folder on my local machine (OSX). I'm using SparkSession and StreamingContext together like so:

val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))

val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")

If I run ssc.start() after this, I get this error:

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

Instead if I try to start the SparkSession like this:


I get:

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

Clearly I'm not understanding how SparkSession and StreamingContext should work together. If I get rid of SparkSession, StreamingContext only has textFileStream on which I need to impose a CSV schema. Would appreciate any clarifications on how to get this working.

like image 865
covfefe Avatar asked Dec 24 '22 08:12


1 Answers

You cannot have a spark session and spark context together. With the release of Spark 2.0.0 there is a new abstraction available to developers - the Spark Session - which can be instantiated and called upon just like the Spark Context that was previously available.

You can still access spark context from the spark session builder:

 val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
 val sc = sparkSess.sparkContext
 val ssc = new StreamingContext(sc, Seconds(time))

One more thing that is causing your job to fail is you are performing the transformation and no action is called. Some action should be called in the end such as inputDF.show()

like image 99
Munna Avatar answered Jan 22 '23 00:01
