I want to create a standalone scala code that uses a custom setting to read from MongoDB using this code in MongoDB website.
When I run SBT package, I face some errors. I guess it is related to wrong creation method of SparkSession. Can you please give me a hint to fix it?
My Buid.sbt
content
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)
Firstapp.scala
code
package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document
object FirstApp {
def main(args: Array[String]) {
val sc = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
println(customRdd.first.toJson)
}
}
and the error after running sbt package
value toJson is not a member of org.apache.spark.sql.Row
[error] println(customRdd.first.toJson)
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 10 s, completed Jun 10, 2020 6:10:50 PM
EDIT1:
I tried the solution but it does not compile properly. The Buid.sbt
content is the same as above. I changed SimpleApp.scala
into:
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstApp {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val sc = spark.sparkContext
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
}
}
The compilation result:
$ spark-submit --class "FirstApp" --master local[4] target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar
20/06/12 07:09:53 WARN Utils: Your hostname, Project resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
20/06/12 07:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/06/12 07:09:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/12 07:09:54 INFO SparkContext: Running Spark version 2.4.5
20/06/12 07:09:54 INFO SparkContext: Submitted application: MongoSparkConnectorIntro
20/06/12 07:09:55 INFO SecurityManager: Changing view acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing view acls groups to:
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls groups to:
20/06/12 07:09:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sadegh); groups with view permissions: Set(); users with modify permissions: Set(sadegh); groups with modify permissions: Set()
20/06/12 07:09:55 INFO Utils: Successfully started service 'sparkDriver' on port 33031.
20/06/12 07:09:55 INFO SparkEnv: Registering MapOutputTracker
20/06/12 07:09:55 INFO SparkEnv: Registering BlockManagerMaster
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/06/12 07:09:55 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7405e1be-08e8-4f58-b88e-b8f01f8fe87e
20/06/12 07:09:55 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/06/12 07:09:55 INFO SparkEnv: Registering OutputCommitCoordinator
20/06/12 07:09:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/06/12 07:09:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
20/06/12 07:09:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4041
20/06/12 07:09:56 INFO SparkContext: Added JAR file:/Folder/target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar at spark://10.0.2.15:33031/jars/root-2_2.11-0.1.0-SNAPSHOT.jar with timestamp 1591938596069
20/06/12 07:09:56 INFO Executor: Starting executor ID driver on host localhost
20/06/12 07:09:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42815.
20/06/12 07:09:56 INFO NettyBlockTransferService: Server created on 10.0.2.15:42815
20/06/12 07:09:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/06/12 07:09:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:42815 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 42815, None)
Exception in thread "main" java.lang.NoClassDefFoundError: com/mongodb/spark/config/ReadConfig$
at FirstApp$.main(SimpleApp.scala:16)
at FirstApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.config.ReadConfig$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 14 more
20/06/12 07:09:56 INFO SparkContext: Invoking stop() from shutdown hook
20/06/12 07:09:56 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4041
20/06/12 07:09:56 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/12 07:09:56 INFO MemoryStore: MemoryStore cleared
20/06/12 07:09:56 INFO BlockManager: BlockManager stopped
20/06/12 07:09:56 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/12 07:09:56 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/12 07:09:56 INFO SparkContext: Successfully stopped SparkContext
20/06/12 07:09:56 INFO ShutdownHookManager: Shutdown hook called
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-7f90ac08-403c-4a3f-bb45-ea24a347c380
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-78cb32aa-c6d1-4ba4-b94f-16d3761d181b
EDIT2:
I added .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
to SimpleApp.scala
but the error remains the same as EDIT1 section:
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstApp {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.getOrCreate()
val sc = spark.sparkContext
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
}
}
Here is the detail steps to create a Scala Project to read the data from MongoDB with Apache spark
You can create a project with IDE or manually with the following files included
project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
build.sbt
name := "SparkMongo"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.1"
val mongoSparkVersion = "2.4.1"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % mongoSparkVersion ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
assemblyJarName in assembly := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
SparkMongo/src/main/scala/com/test/FirstMongoSparkApp.scala
package com.test
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstMongoSparkApp extends App {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkProject")
.config("spark.mongodb.input.uri", "mongodb://localhost/test.cities")
.config("spark.mongodb.output.uri", "mongodb://localhost/test.outputCities")
.getOrCreate()
import spark.implicits._
val readConfig = ReadConfig(Map("collection" -> "cities", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(spark.sparkContext)))
val customRdd = MongoSpark.load(spark.sparkContext, readConfig)
customRdd.toDF().show(false)
}
Now you can perform sbt assembly
will generate a jar file SparkMongo_2.11-0.1.jar
You can run the jar file as
spark-submit --class "com.test.FirstMongoSparkApp" --master "local" target/scala-2.11/SparkMongo_2.11-0.1.jar
To run without issues make sure you have the same version of spark as in the dependency, In this case 2.4.1 and mongoDB version 2.6+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With