Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to query to mongo using spark?

I am using spark and mongo. I am able to connect to mongo using following code:

val sc = new SparkContext("local", "Hello from scala")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

above code gives me all documents from collection.

Now I want to apply some conditions on query.

For that I used

config.set("mongo.input.query","{customerId: 'some mongo id'}")

This took only one condition at a time. I want to add a condition if 'usage' > 30

1) How do I add multiple conditions to mongo query (including greater than and less than) using spark and mongo??

Also I want to iterate over each document of result of query using scala??

2) How do I iterate through result using scala??

like image 847
Vishwas Avatar asked Dec 17 '14 10:12

Vishwas


2 Answers

Hi you can try this:

There is a project that integrates MongoDB with Spark

https://github.com/Stratio/deep-spark/tree/develop

1) do a git clone

2) go inside deep-spark, then to deep-parent

3) mvn install

4) open spark-shell with this options:

./spark-shell --jars YOUR_PATH/deep-core-0.7.0-SNAPSHOT.jar,YOUR_PATH/deep-commons-0.7.0-SNAPSHOT.jar,YOUR_PATH/deep-mongodb-0.7.0-SNAPSHOT.jar,YOUR_PATH/mongo-java-driver-2.12.4-sources.jar

remember to overwrite "YOUR_PATH" with the real path

5)Execute a simple example in the spark shell:

import com.stratio.deep.mongodb.config.MongoDeepJobConfig
import com.stratio.deep.mongodb.extractor.MongoNativeDBObjectExtractor
import com.stratio.deep.core.context.DeepSparkContext
import com.mongodb.DBObject
import org.apache.spark.rdd.RDD
import com.mongodb.QueryBuilder
import com.mongodb.BasicDBObject

val host = "localhost:27017"


val database = "test"

val inputCollection = "input";

val deepContext: DeepSparkContext = new DeepSparkContext(sc)

val inputConfigEntity: MongoDeepJobConfig[DBObject] = new MongoDeepJobConfig[DBObject](classOf[DBObject])


val query: QueryBuilder  = QueryBuilder.start();

query.and("number").greaterThan(27).lessThan(30);


inputConfigEntity.host(host).database(database).collection(inputCollection).filterQuery(query).setExtractorImplClass(classOf[MongoNativeDBObjectExtractor])


val inputRDDEntity: RDD[DBObject] = deepContext.createRDD(inputConfigEntity)

The best thing of this is that you can use a QueryBuilder Object to make your queries

Also you can pass a DBObject like this:

{ "number" : { "$gt" : 27 , "$lt" : 30}}

If you want to iterate you can use the method yourRDD.collect(). Also you can use yourRDD.foreach, but you have to provide a function.

There is another way to add jars to spark. You can modify spark-env.sh and put this line at the end:

CONFDIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
for jar in $(ls $CONFDIR/../lib/*.jar); do
  SPARK_CLASSPATH=$SPARK_CLASSPATH:${jar}
done

Inside the lib folder you put your libraries and that is all.

Disclaimer: I am currently working on Stratio

like image 58
Ricardo Crespo Avatar answered Sep 21 '22 05:09

Ricardo Crespo


1) In order to add conditions to your query just add them in the dictionary provided with 'mongo.input.query':

config.set("mongo.input.query","{customerId: 'some mongo id', usage: {'$gt': 30}")

To understand better how queries work refer to:

http://docs.mongodb.org/manual/tutorial/query-documents/

http://docs.mongodb.org/getting-started/python/query/

2) For iterating over the result you may want to take a look to spark RDD method 'collect', more info in this link, just look for the collect method:

http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD

like image 35
msemelman Avatar answered Sep 18 '22 05:09

msemelman