I have Cassandra database from which i analyzed the data using SparkSQL through Apache Spark. Now i want to insert those analyzed data into PostgreSQL . Is there any ways to achieve this directly apart from using the PostgreSQL driver (I achieved it using postREST and Driver i want to know whether there is any methods like saveToCassandra()
)?
You can use Postgres copy api to write it, its much faster that way. See following two methods - one iterates over RDD to fill the buffer that can be saved by copy api. Only thing you have to take care of is creating correct statement in csv format that will be used by copy api.
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
The answers above refers to old spark versions, in spark 2.* there is jdbc connector, enable write directly to RDBS from a dataFrame.
example:
jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With