Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Serialization issues in Spark Streaming

I'm quite confused about how Spark works with the data under the hood. For example, when I run a streaming job and apply foreachRDD, the behaviour depends on whether a variable is captured from the outer scope or initialised inside.

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

In this case, I get an exception:

java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.

But if I move sparkConf inside, everything seems to be fine:

dStream.foreachRDD(rdd => {
    val sparkConf = rdd.sparkContext.getConf
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

This looks quite odd to me because I thought that foreachRDD runs on the driver node, so I didn't expect any difference.

Now, if I move both the SQL session and the config outside foreachRDD, it works fine again:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

A snippet in Spark documentation suggests the previous version (where both config and SQL context are created inside foreachRDD), which seems less efficient to me: why create them for every batch if they could be created just once?

Could someone explain why the exception is thrown and what is the proper way to create the SQL context?

like image 502
lizarisk Avatar asked May 04 '26 13:05

lizarisk


1 Answers

ForeachRDD run, as the name suggest, foreach rdd in the streaming why you should recreate the spark context at each rdd? The correct approach is the last one :

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})
like image 199
eugenio calabrese Avatar answered May 06 '26 06:05

eugenio calabrese



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!