I want to know how can I do following things in scala?
I know to do it using scala but how to import the connector jar of psql scala into sbt while packaging it?
Load data from PostgreSQL in SparkNow we can use the same package to load data from PostgreSQL database in Spark. The data load part will run in Spark driver application.
Our goal is to run parallel SQL queries from the Spark workers.
Add the connector and JDBC to the libraryDependencies
in build.sbt
. I've only tried this with MySQL, so I'll use that in my examples, but Postgres should be much the same.
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
When you create the SparkContext
you tell it which jars to copy to the executors. Include the connector jar. A good-looking way to do this:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Now Spark is ready to connect to the database. Each executor will run part of the query, so that the results are ready for distributed computation.
There are two options for this. The older approach is to use org.apache.spark.rdd.JdbcRDD
:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Check out the documentation for the parameters. Briefly:
SparkContext
.SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
in the example.ResultSet
into something. In the example we convert it into a String
, so you end up with an RDD[String]
.Since Apache Spark version 1.3.0 another method is available through the DataFrame API. Instead of the JdbcRDD
you would create an org.apache.spark.sql.DataFrame
:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
See https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases for the full list of options (the key range and number of partitions can be set just like with JdbcRDD
).
JdbcRDD
does not support updates. But you can simply do them in a foreachPartition
.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(This creates one connection per partition. If that is a concern, use a connection pool!)
DataFrame
s support updates through the createJDBCTable
and insertIntoJDBC
methods.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With