I'm looking for the best way to de-duplicate events using Cassandra.
I have many clients receiving event id's (thousands per second). I need to ensure that each event id is processed once and only once with high reliability and high availability.
So far I've tried two methods:
Use the event id as a partition key, and do an "INSERT ... IF NOT EXISTS". If that fails, then the event is a duplicate and can be dropped. This is a nice clean approach, but the throughput is not great due to Paxos, especially with higher replication factors such as 3. It's also fragile, since IF NOT EXISTS always requires a quorum to work and there's no way to back down to a lower consistency if a quorum isn't available. So a couple of down nodes will completely block some event id's from being processed.
Allow clients to collide on the same event id, but then detect the collision using a clustering column. So insert using the event id as a partition key, and a client generated timeuuid as a clustering column. The client will then wait a while (in case other clients are inserting the same partition key) and then do a read of the event id with limit 1, to return the oldest clustered row. If the timeuuid it reads back matches what it inserted, then it is the "winner" and processes the event. If the timeuuid does not match, then it is a duplicate and can be dropped.
The collision (baker's algorithm) approach has much better throughput and availability than using IF NOT EXISTS, but it's more complex and feels more risky. For example if the system clock on a client is out of whack, then a duplicate event would look like a non-duplicate. All my client and Cass nodes use NTP, but that's not always perfect at synchronizing clocks.
Anyone have a suggestion for which approach to use? Is there another way to do this?
Also note that my cluster will be set up with three data centers with about 100 ms latency between DC's.
Thanks.
IF NOT EXISTS does not scale as well as stock Cassandra (because coordination is slow, but you know that), but is probably the "official, right" way to do it. There are two other methods that "work":
1) Use an external locking system (zookeeper, memcached CAS, etc), that allows you to handle the coordination OUTSIDE of cassandra.
2) Use an ugly hack of an inverted timestamp trick so that first write wins. Rather than using a client supplied timestamp that corresponds to actual wall time, use MAX_LONG - (wall time) = timestamp. That way, the first write has the highest "timestamp", and will take precedence of subsequent writes. This method works, though it plays havok with things like DTCS (if you're doing time series and want to use DTCS, don't use this method, DTCS will be horribly confused) and deletion in general (if you ever want to ACTUALLY DELETE a row with a REAL tombstone, you'll have to write that tombstone with an artificial timestamp as well.
It's worth noting that there have been attempts to address the 'last-write-always-wins' nature of cassandra - for example, CASSANDRA-6412 (which I had working at one point, and will likely pick up again in the next month or so).
Might be diverting here but have you tried distributed redis locks http://redis.io/topics/distlock with sharding based on event_id using Twemproxy as a proxy for redis, if your loads are too high.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With