I'm using Spark with cassandra. And i'm reading some rows from my table in order to delete theme using the PrimaryKey. This is my code :
val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
select("a","b","c","d").
where("d=?", d).cache()
lines.foreach(r => {
val session: Session = connector.openSession
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
session.execute(delete)
session.close()
})
But this method create an session for each row and it takes lot of time. So is it possible to delete my rows using sc.CassandraTable or another solution better then the mine.
Thank you
I don't think there's a support for delete
at the moment on the Cassandra Connector. To amortize the cost of connection setup, the recommended approach is to apply the operation to each partition.
So your code will look like this:
lines.foreachPartition(partition => {
val session: Session = connector.openSession //once per partition
partition.foreach{elem =>
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
session.execute(delete)
}
session.close()
})
You could also look into using the DELETE FROM ... WHERE pk IN (list)
and use a similar approach to build up the list
for each partition. This will be even more performant, but might break with very large partitions as the list will become consequentially long. Repartitioning your target RDD before applying this function will help.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With