I am working with Apache Spark and Cassandra, and I want to save my RDD to Cassandra with spark-cassandra-connector.
Here's the code:
def saveToCassandra(step: RDD[(String, String, Date, Int, Int)]) = {
step.saveToCassandra("keyspace", "table")
}
This works fine most of the time, but overrides data that's already present in the db. I would like not to override any data. Is it somehow possible ?
What I do is this:
rdd.foreachPartition(x => connector.WithSessionDo(session => {
someUpdater.UpdateEntries(x, session)
// or
x.foreach(y => someUpdater.UpdateEntry(y, session))
}))
The connector above is CassandraConnector(sparkConf).
It's not as nice as a simple saveToCassandra, but it allows for a fine-grained control.
I think it's better to use WithSessionDo outside the foreach partition instead. There's overhead involved in that call that need not be repeated.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With