Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to delete rows in database with Spark?

I know how to insert rows

    df.write \
        .format('jdbc') \
        .option("url", url) \
        .option("dbtable", table) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "org.postgresql.Driver") \
        .mode('append') \
        .save()

But how to delete rows? like.

df = [Row(id=1), Row(id=2), ... ]

=> DELETE FROM TABLE WHERE id in df ...

Is it possible?

like image 344
zenyatta Avatar asked Oct 30 '25 05:10

zenyatta


1 Answers

Spark doesn't support it. But I've done it with foreachPartition (Just use dataframe data..)

Like this Does Apache Spark SQL support MERGE clause?

df.rdd.coalesce(2).foreachPartition(partition => {
  val connectionProperties = brConnect.value
  val jdbcUrl = connectionProperties.getProperty("jdbcurl")
  val user = connectionProperties.getProperty("user")
  val password = connectionProperties.getProperty("password")
  val driver = connectionProperties.getProperty("Driver")
  Class.forName(driver)
  val dbc: Connection = DriverManager.getConnection(jdbcUrl, user, password)
  val db_batchsize = 1000
  val sqlString = "INSERT employee USING values (?, ?, ?, ?)"

  var pstmt: PreparedStatement = dbc.prepareStatement(sqlString)
  partition.grouped(db_batchsize).foreach(batch => {
    batch.foreach{ row =>
      {
        val id = row.id
        val fname = row.fname
        val lname = row.lname
        val userid = row.userid

        var pstmt: PreparedStatement = 
        pstmt.setLong(1, row.id)
        pstmt.setString(2, row.fname)
        pstmt.setString(3, row.lname)
        pstmt.setString(4, row.userid)
        pstmt.addBatch()
      }
    }
    pstmt.executeBatch()
    dbc.commit()
  })
  dbc.close()
})
like image 181
zenyatta Avatar answered Nov 01 '25 19:11

zenyatta



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!