Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best approach to Cassandra (+ Spark?) for Continuous Queries?

We're currently using Hazelcast (http://hazelcast.org/) as a distributed in-memory data grid. That's been working sort-of-well for us, but going solely in-memory has exhausted its path in our use case, and we're considering porting our application to a NoSQL persistent store. After the usual comparisons and evaluations, we're borderline close to picking Cassandra, plus eventually Spark for analytics.

Nonetheless, there is a gap in our architectural needs that we're still not grasping how to solve in Cassandra (with or without Spark): Hazelcast allows us to create a Continuous Query in that, whenever a row is added/removed/modified from the clause's resultset, Hazelcast calls up back with the corresponding notification. We use this to continuously update the clients via AJAX streaming with the new/changed rows.

This is probably a conceptual mismatch we're making, so - how to best address this use case in Cassandra (with or without Spark's help)? Is there something in the API that allows for Continuous Queries on key/clause changes (haven't found it)? Is there some other way to get a stream of key/clause updates? Events of some sort?

I'm aware that we could, eventually, periodically poll Cassandra, but in our use case, the client is potentially interested in a large number of table clause notifications (think "all changes to Ship positions on California's coastline"), and iterating out of the store would kill the streamer's scalability.

Hence, the magic question: what are we missing? Is Cassandra the wrong tool for the job? Are we not aware of a particular part of the API or external library in/outside the apache realm that would allow for this?

Many thanks for any assistance!

Hugo

like image 336
Hugo Pinto Avatar asked Jan 03 '15 10:01

Hugo Pinto


2 Answers

I'm not an expert on spark, so take this with a grain of salt, but perhaps you could use an approach like this:

  1. Use spark streaming for real time analytics of the incoming data stream and pushing position updates to clients in real time.

  2. Use Cassandra for persistent storage, cached views, and rollups of data from which clients can pull data.

So you would write a spark streaming application that connects to your incoming data stream, presumably one that reports ship positions at regular intervals. When it receives a ship position, it would look up the last known position of the ship in Cassandra (previously stored in a clustered time series of positions for that ship id, reverse sorted by timestamp so that the most recent position is the first row). If the ship position has changed, the spark application would insert a new time series row in Cassandra and push the new position to your real time client.

Spark would also be writing other updates to Cassandra for rollup things clients may want to know, such as a table for how many ships are currently in San Francisco bay. When the client clicks on the bay, the rollup table is queried to pull that data for display. Anything that needs a fast response time on the client should be pre-computed by spark and stored in Cassandra for quick retrieval.

When a new client starts up, they would first query (pull from) Cassandra to get the current position of all ships, and then real time updates to that data would be pushed from the spark application.

like image 191
Jim Meyer Avatar answered Sep 29 '22 19:09

Jim Meyer


Use spark streaming. When an update is needed, carry out two operations:

  1. Do a saveToCassandra which will update cassandra data for future queries.
  2. Push down the change to clients using whatever you're using. You could do the AJAX notification from Spark Streaming if your AJAX push can be put into the streaming client code. Otherwise, you can send a message to some proxy which would relay to the Ajax clients.

Your code could look something like this:

val notifications = ssc.whateverSourceYouHaveThatGivesADstream(...)
notifications.foreachRDD(x => {
    x.foreachPartition(x => {
      cassandraConnector.withSessionDo(session => {
          x.foreach(y => {
              //use session to update cassandra
              // broadcast via AJAX or send to proxy to broadcast
          })
      })
    })
})

Hope that helps.

like image 31
ashic Avatar answered Sep 29 '22 21:09

ashic