Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to achieve an insert across multiple tables in Cassandra, uniquely and atomically?

I have a data model where the domain object has two fields that must both be unique and the object must be gettable via both independently. One of them is randomly generated, so we can assume no possible collisions. The other is user-selected. Here's what I've come up with:

CREATE TABLE object_primary (
    generated_value text PRIMARY KEY,
    data blob
);

CREATE TABLE object_unique_index (
    user_value text PRIMARY KEY,
    generated_value text
);

Here I'm using object_unique_index as both an index into the primary table and a resource lock, where the resource is the user-chosen globally unique value.

First thoughts:

  • Inserting into object_unique_index must use IF NOT EXISTS. Therefore I cannot use a batch.
  • Inserting into object_primary doesn't as uniqueness is already guaranteed by the generator. This then lets me use custom TIMESTAMPs, avoiding a read-back on creation.
  • If the first insert fails, I shouldn't continue with the second insert.
  • If the second insert fails, I should rollback the first insert.
  • The system should not be left in a state where a row exists in either column without the other.
  • I'm willing to ignore errors during rollback, and will delegate clean-up to an external (possibly manual) process.

It seems clear how to proceeded, but I am struggling with interpreting certain error cases for a non-conditional update. All existing descriptions assume you don't care what the end result is, and will just try the write again later.

UnavailableException: There weren't enough nodes up for a quorum, but when they come back on-line the saved hint will re-run the write. Does this mean that the eventual state will be that the write succeeded? If so, what read consistency level allows me to see it? If not, how do I know what the eventual state will be?

CassandraWriteTimeoutException: There were enough nodes for a quorum, but some of them didn't reply in time. As far as I can tell this is just a more ambiguous version of UnavailableException. Are there any differences in how it should be handled?

A lot of my confusion is coming from the conflicting statements here:

The coordinator can force the results towards either the pre-update or post-update state.

[...]

the coordinator stores the update locally, and will re-send it to the failed replica when it recovers, thus forcing it to the post-update state that the client wanted originally

So when does it force it to the pre-update state? How can I tell whether it ends up in post-update (so I ignore it) or pre-update (so I rollback the first insert)?

Is there a way to solve this without requiring all inserts to be conditional, and thus adding more performance penalties and losing the ability to set the writetime?

like image 349
OrangeDog Avatar asked Jan 15 '16 17:01

OrangeDog


2 Answers

The DataStax blog article on Cassandra error handling done right covers most of the topics raised in this question, and I will be referring to parts of that article throughout this answer.

The system should not be left in a state where a row exists in either column without the other.

Use an atomic batch encompassing the writes to both tables. Do not use compare-and-set (CAS) operations like IF NOT EXISTS inside the batch. I'll cover that later.

Cassandra 1.2 introduced atomic batches, which rely on a batch log to guarantee that all the mutations in the batch will eventually be applied. This atomicity is useful as it allows developers to execute several writes on different partitions without having to worry on which part will be applied and which won’t: either all or none of the mutations will be eventually written.

It is important to note that it does not give you control over exactly when the write becomes visible, but either all parts of the batch will (eventually) or none of them will (ever).

The user-selected value must be unique. (System-assigned value is assumed to always be unique already.)

Lightweight transactions (another term for CAS) is the only way to guarantee uniquenes, as you know. I recommend creating a third table dedicated to the purpose of conditional writes defined similarly to how you defined object_unique_index in your question; I'll call it unique_user_value_for_insert. By dedicating a table for the insertion path only then no other application logic would "see" an inconsistent state due to race conditions because nothing else should be reading from this table; its sole use is for the IF NOT EXISTS check. (I am making the assumption that both tables in the batch are used for read operations by normal application logic.)

INSERT user_value, generated_value INTO unique_user_value_for_insert IF NOT EXISTS;

If this insert returns a result set where [applied]=false, then the user-provided name was not unique and you should not try the batch insert. If the result set indicates [applied]=true then execute the batch.

With this CAS insert and the batch above, your normal, "happy" paths through this logic should be covered. We still need to handle the possible exceptional paths.

UnavailableException

When a request reaches the coordinator and there is not enough replica alive to achieve the requested consistency level, the driver will throw an UnavailableException. If you look carefully at this exception, you’ll notice that it’s possible to get the amount of replicas that were known to be alive when the error was triggered, as well as the amount of replicas that where required by the requested consistency level.

I cannot speak as an authority on the exceptions, but this description makes it sound like the coordinator node throws this exception before any attempt to execute the operation. If true, then a failure on the initial CAS insert needs no recovery action, beyond your application recognizing that the insert did not succeed for a reason other than a uniqueness violation. A failure on the atomic batch (after the CAS insert) would suggest that you need to "undo" the CAS insert.

I'd undo by sending a DELETE with a very permissive write Consistency Level like CL.ANY to make sure the delete has the best chance of getting persisted/replayed to any available replicas that may have executed the write. If that fails, your cluster is not healthy.

CassandraWriteTimeoutException

If a write timeouts at the coordinator level, there is no way to know whether the mutation has been applied or not on the non-answering replica. ... [T]he way this error will be handled will thus depends on whether the write operation was idempotent (which is the case of most statements in CQL) or not (for counter updates, and append/prepend updates on lists).

The way to handle this varies dramatically based on which of the above two operations failed, and the information presented in the exception. I regret having to quote so much from the blog, but it has very good explanations. First, if the CAS insert operation fails with this exception:

If the paxos phase fails, the driver will throw a WriteTimeoutException with a WriteType.CAS as retrieved with WriteTimeoutException#getWriteType(). In this situation you can’t know if the CAS operation has been applied so you need to retry it in order to fallback on a stable state. Because lightweight transactions are much more expensive that regular updates, the driver doesn’t automatically retry it for you. The paxos phase can also lead to an UnavailableException if not enough replicas are available. In this situation, retries won’t help as only SERIAL and LOCAL_SERIAL consistencies are available.

This is perhaps the most complicated failure in your scenario. Since "you can't know if the CAS operation has been applied" then retrying IF NOT EXISTS is ambiguous in some cases. If you try again and it succeeds that's the best case; the retried inserted value is still unique and you can proceed with the batch. If IF NOT EXISTS fails then there are two possibilities:

  1. The original failed CAS operation partially succeeded writing to one or more replicas.
  2. The value was not actually unique and the original operation should've returned [applied]=false].

I don't believe you can disambiguate between these cases without doing a read operation on some other authoritative state. If you take my recommendation to use that "third" insert-only table for CAS, then query the other table(s) to see if there is existing data for that name.

Or, if the CAS insert fails during the commit phase:

The commit phase is then similar to regular Cassandra writes in the sense that it will throw an UnavailableException or a WriteTimeoutException if the amount of required replicas or acknowledges isn’t met. In this situation rather than retrying the entire CAS operation, you can simply ignore this error if you make sure to use setConsistencyLevel(ConsistencyLevel.SERIAL) on the subsequent read statements on the column that was touched by this transaction, as it will force Cassandra to commit any remaining uncommitted Paxos state before proceeding with the read. That being said, it probably won’t be easy to organize an application to use SERIAL reads after a CAS write failure, so you may prefer another alternative such as an entire retry of the CAS operation.

The above information also seems to apply to failures from this exception in the batch case too, as closer to a "regular" write than a Paxos transaction, with this additional information:

If a timeout occurs when a batch is executed, the developer has different options depending on the type of write that timed out (see WriteTimeoutException#getWriteType()):
 

BATCH_LOG: a timeout occurred while the coordinator was waiting for the batch log replicas to acknowledge the log. Thus the batch may or may not be applied. By default, the driver will retry the batch query once, when it’s notified that such a timeout occurred. So if you receive this error, you may want to retry again, but that’s already a bad smell that the coordinator has been unlucky at picking replicas twice.

BATCH: a timeout occurred while reaching replicas for one of the changes in an atomic batch, after an entry has been successfully written to the batch log. Cassandra will thus ensure that this batch will get eventually written to the appropriate replicas and the developer doesn’t have to do anything. Note however that this error still means that all the columns haven’t all been updated yet. So if the immediate consistency of these writes is required in the business logic to be executed, you probably want to consider an alternate end, or a warning message to the end user.

UNLOGGED_BATCH: the coordinator met a timeout while reaching the replicas for a write query being part of an unlogged batch. This batch isn’t guaranteed to be atomic as no batch log entry is written, thus the parts of the batch that will or will not be applied are unknown. A retry of the entire batch will be required to fall back on a known state.

Alternatively, make the application robust enough to handle inconsistent state.

In my own applications, I did not bother with a third table. My process was:

  1. Insert the record(s) keyed by the system-generated unique value without worrying about uniqueness.
  2. Insert the record that points to the previous ID given the user-generated value as a key, conditional on IF NOT EXISTS. This should be the last step performed.
  3. On any failure, attempt to undo the previous operations by removing the data. In this case I don't expose the system generated IDs to the user except on complete success, and (in my application) nothing scans the system-generated IDs so my worst-case scenario is "dangling" data wasting space.
  4. Implement a periodic, background scrubbing task to look for and fix inconsistencies.
like image 185
William Price Avatar answered Nov 12 '22 08:11

William Price


tl;dr of William Price's answer

It's only the unconditional insert that has any doubt, so switch the order of operations and assume failure on any exception. As long as you never give out a possibly failed system-generated id, it can never be used, so it doesn't matter if the corresponding user-generated value isn't unique.

  1. Insert the record keyed by the system-generated unique value without worrying about uniqueness.
  2. On any error, delete the record with a consistency level of ANY and fail the whole operation.
  3. Insert the record that points to the previous ID given the user-generated value as a key, conditional on IF NOT EXISTS.
  4. As previously described, determine whether it actually succeeded, and perform the same rollback of the previous insert in step 2 if not
  5. Implement a periodic, background scrubbing task to look for and fix inconsistencies.

So don't need to worry about interpreting ambiguous exceptions at all.

like image 44
OrangeDog Avatar answered Nov 12 '22 08:11

OrangeDog