Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: foreachRDD update my mongo RDD

I want to create a new mongodb RDD each time I enter inside foreachRDD. However I have serialization issues:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })

This will give me an error:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)

Any idea?

like image 535
thomas legrand Avatar asked Jan 15 '16 15:01

thomas legrand


2 Answers

You might try to use rdd.context that returns either a SparkContext or a SparkStreamingContext (if rdd is a DStream).

mydstream foreachRDD { rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })

Actually, it seems that RDD has also a .sparkContext method. I honestly don't know the difference, maybe they are aliases (?).

like image 116
Markon Avatar answered Oct 16 '22 04:10

Markon


In my understanding you have to add if you have a "not serializable" object, you need to pass it through foreachPartition so you can make a connection to database on each node before running your processing.

mydstream.foreachRDD(rdd => {
        rdd.foreachPartition{
          val mongoClient = MongoClient("localhost", 27017)
          val db = mongoClient(mongoDatabase)
          val coll = db(mongoCollection)
          // ssc is my StreamingContext
          val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})
like image 43
Rami Avatar answered Oct 16 '22 04:10

Rami