Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delete from cassandra Table in Spark

I'm using Spark with cassandra. And i'm reading some rows from my table in order to delete theme using the PrimaryKey. This is my code :

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})

But this method create an session for each row and it takes lot of time. So is it possible to delete my rows using sc.CassandraTable or another solution better then the mine.

Thank you

like image 645
Amine CHERIFI Avatar asked Jan 08 '23 21:01

Amine CHERIFI


1 Answers

I don't think there's a support for delete at the moment on the Cassandra Connector. To amortize the cost of connection setup, the recommended approach is to apply the operation to each partition.

So your code will look like this:

lines.foreachPartition(partition => {
    val session: Session = connector.openSession //once per partition
    partition.foreach{elem => 
        val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
        session.execute(delete)
    }
    session.close()
})

You could also look into using the DELETE FROM ... WHERE pk IN (list) and use a similar approach to build up the list for each partition. This will be even more performant, but might break with very large partitions as the list will become consequentially long. Repartitioning your target RDD before applying this function will help.

like image 199
maasg Avatar answered Jan 17 '23 17:01

maasg