Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Mongodb Connector Scala - Missing database name

I'm stuck with a weird issue. I'm trying to locally connect Spark to MongoDB using mongodb spark connector.

Apart from setting up spark I'm using the following code:

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))

// Load the movie rating data from Mongo DB
val movieRatings = MongoSpark.load(sc, readConfig).toDF()

movieRatings.show(100)

However, I get a compilation error:

java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property.

On line where I set up readConfig. I don't get why it's complaining for not set uri when I clearly have a uri property in the Map. I might be missing something.

like image 665
Daniil Andreyevich Baunov Avatar asked Oct 18 '17 15:10

Daniil Andreyevich Baunov


2 Answers

You can do it from SparkSession as mentioned here

val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

create dataframe using the config

val readConfig = ReadConfig(Map("uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", "readPreference.name" -> "secondaryPreferred"))
val df = MongoSpark.load(spark)

Write df to mongodb

MongoSpark.save(
df.write
    .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .mode("overwrite"))

In your code: prefixes are missing in config

val readConfig = ReadConfig(Map(
    "spark.mongodb.input.uri" -> "mongodb://localhost:27017/movie_db.movie_ratings", 
    "spark.mongodb.input.readPreference.name" -> "secondaryPreferred"), 
    Some(ReadConfig(sc)))

val writeConfig = WriteConfig(Map(
    "spark.mongodb.output.uri" -> "mongodb://127.0.0.1/movie_db.movie_ratings"))
like image 136
mrsrinivas Avatar answered Oct 03 '22 10:10

mrsrinivas


For Java, either you can set the configs while creating spark session or first create the session and then set it as runtime configs.

1.

SparkSession sparkSession = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnector")
    .config("spark.mongodb.input.uri","mongodb://localhost:27017/movie_db.movie_ratings")
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movie_db.movie_ratings")
    .getOrCreate()

OR

2.

 SparkSession sparkSession = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnector")
        .getOrCreate()

Then,

     String mongoUrl = "mongodb://localhost:27017/movie_db.movie_ratings";
   sparkSession.sparkContext().conf().set("spark.mongodb.input.uri", mongoURL);
   sparkSession.sparkContext().conf().set("spark.mongodb.output.uri", mongoURL);
   Map<String, String> readOverrides = new HashMap<String, String>();
   readOverrides.put("collection", sourceCollection);
   readOverrides.put("readPreference.name", "secondaryPreferred");
   ReadConfig readConfig = ReadConfig.create(sparkSession).withOptions(readOverrides);
   Dataset<Row> df = MongoSpark.loadAndInferSchema(sparkSession,readConfig);
like image 38
Vignesh Jaganathan Avatar answered Oct 03 '22 08:10

Vignesh Jaganathan