Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark write to postgres slow

I'm writing data (approx. 83M records) from a dataframe into postgresql and it's kind of slow. Takes 2.7hrs to complete writing to db.

Looking at the executors, there is only one active task running on just one executor. Is there any way I could parallelize the writes into db using all executors in Spark?

...
val prop = new Properties()
prop.setProperty("user", DB_USER)
prop.setProperty("password", DB_PASSWORD)
prop.setProperty("driver", "org.postgresql.Driver")



salesReportsDf.write
              .mode(SaveMode.Append)
              .jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", REPORTS_TABLE, prop)

Thanks

like image 992
Adetiloye Philip Kehinde Avatar asked Sep 08 '16 17:09

Adetiloye Philip Kehinde


People also ask

How can I speed up spark DF write JDBC to Postgres?

Use " df. repartition(n) " to partiton the dataframe so that each partition is written in DB parallely. Note - Large number of executors will also lead to slow inserts. So start with 5 partitions and increase the number of partitions by 5 till you get optimal performance.


2 Answers

So I figured out the problem. Basically, repartitioning my dataframe increase the database write throughput by 100%

def srcTable(config: Config): Map[String, String] = {

  val SERVER             = config.getString("db_host")
  val PORT               = config.getInt("db_port")
  val DATABASE           = config.getString("database")
  val USER               = config.getString("db_user")
  val PASSWORD           = config.getString("db_password")
  val TABLE              = config.getString("table")
  val PARTITION_COL      = config.getString("partition_column")
  val LOWER_BOUND        = config.getString("lowerBound")
  val UPPER_BOUND        = config.getString("upperBound")
  val NUM_PARTITION      = config.getString("numPartitions")

  Map(
    "url"     -> s"jdbc:postgresql://$SERVER:$PORT/$DATABASE",
    "driver"  -> "org.postgresql.Driver",
    "dbtable" -> TABLE,
    "user"    -> USER,
    "password"-> PASSWORD,
    "partitionColumn" -> PARTITION_COL,
    "lowerBound" -> LOWER_BOUND,
    "upperBound" -> UPPER_BOUND,
    "numPartitions" -> NUM_PARTITION
  )

}
like image 60
Adetiloye Philip Kehinde Avatar answered Nov 15 '22 08:11

Adetiloye Philip Kehinde


Spark also has a option called "batchsize" while writing using jdbc. The default value is pretty low.(1000)

connectionProperties.put("batchsize", "100000")

Setting it to much higher values should speed up writing to external DataBases.

like image 34
Soumyadip Ghosh Avatar answered Nov 15 '22 06:11

Soumyadip Ghosh