I want to create a new mongodb RDD each time I enter inside foreachRDD
. However I have serialization issues:
mydstream
.foreachRDD(rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
This will give me an error:
object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)
Any idea?
You might try to use rdd.context that returns either a SparkContext or a SparkStreamingContext (if rdd is a DStream).
mydstream foreachRDD { rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })
Actually, it seems that RDD has also a .sparkContext
method. I honestly don't know the difference, maybe they are aliases (?).
In my understanding you have to add if you have a "not serializable" object, you need to pass it through foreachPartition
so you can make a connection to database on each node before running your processing.
mydstream.foreachRDD(rdd => {
rdd.foreachPartition{
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With