Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Update collection in MongoDb via Apache Spark using Mongo-Hadoop connector

I would like to update a specific collection in MongoDb via Spark in Java. I am using the MongoDB Connector for Hadoop to retrieve and save information from Apache Spark to MongoDb in Java.

After following Sampo Niskanen's excellent post regarding retrieving and saving collections to MongoDb via Spark, I got stuck with updating collections.

MongoOutputFormat.java includes a constructor taking String[] updateKeys, which I am guessing is referring to a possible list of keys to compare on existing collections and perform an update. However, using Spark's saveAsNewApiHadoopFile() method with parameter MongoOutputFormat.class, I am wondering how to use that update constructor.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

Prior to this, MongoUpdateWritable.java was being used to perform collection updates. From examples I've seen on Hadoop, this is normally set on mongo.job.output.value, maybe like this in Spark:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

However, I'm still wondering how to specify the update keys in MongoUpdateWritable.java.

Admittedly, as a hacky way, I've set the "_id" of the object as my document's KeyValue so that when a save is performed, the collection will overwrite the documents having the same KeyValue as _id.

JavaPairRDD<BSONObject,?> analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD<Object, BSONObject> save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key : o.keySet()){
        id += key + ":" + (String) o.get(key) + "_";
    }
    o.put("_id", id);

    o.put("result", s._2);
    return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

I would like to perform the mongodb collection update via Spark using MongoOutputFormat or MongoUpdateWritable or Configuration, ideally using the saveAsNewAPIHadoopFile() method. Is it possible? If not, is there any other way that does not involve specifically setting the _id to the key values I want to update on?

like image 363
dyltini Avatar asked Mar 19 '23 02:03

dyltini


1 Answers

I tried several combination of config.set("mongo.job.output.value","....") and several combination of

.saveAsNewAPIHadoopFile(
        "file:///bogus",
        classOf[Any],
        classOf[Any],
        classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
        mongo_config
      )

and none of them worked.

I made it to work by using MongoUpdateWritable class as output of my map method:

items.map(row => {
      val mongo_id = new ObjectId(row("id").toString)
      val query = new BasicBSONObject()
      query.append("_id", mongo_id)
      val update = new BasicBSONObject()

      update.append("$set", new BasicBSONObject().append("field_name", row("new_value")))
      val muw = new MongoUpdateWritable(query,update,false,true)
      (null, muw)
    })
     .saveAsNewAPIHadoopFile(
       "file:///bogus",
       classOf[Any],
       classOf[Any],
       classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
       mongo_config
     )

The raw query executed in mongo is then something like this:

2014-11-09T13:32:11.609-0800 [conn438] update db.users query: { _id: ObjectId('5436edd3e4b051de6a505af9') } update: { $set: { value: 10 } } nMatched:1 nModified:0 keyUpdates:0 numYields:0 locks(micros) w:24 3ms
like image 178
Francesco Laurita Avatar answered Mar 26 '23 04:03

Francesco Laurita