Given a result stream with a lot of items I want to store them and handle potential concurrency conflicts:
public void onTriggerEvent(/* params */) {
Stream<Result> results = customThreadPool.submit(/*...complex parallel computation on multiple servers...*/).get();
List<Result> conflicts = store(results);
resolveConflictsInNewTransaction(conflicts);
}
I am stuck on how to approach implementing store(...)
efficiently. The Result
consists of two immutable and detached objects describing data that needs to be updated in their respective DB tables.
@Value
public static class Result {
A a; // describes update for row in table a
B b; // describes update for row in table b
}
A
and B
each reference two users, where (u1, u2)
is a key on the respective DB table.
@Value
public static class A {
long u1;
long u2;
// ... computed data fields ...
}
// B accordingly
The stream-calculation itself might be triggered concurrently (multiple onTriggerEvent
invocations in parallel) which is mostly fine, but sometimes might result in conflicts for some results (about 0,1% is in conflict, e.g. a stream has a result for (53,21)
and another invocation also updated (53,21)
in the meantime). The conflict of A
and/or B
is indicated by their updatedAt
fields that would be different in comparison to the beginning of the operation. Here, of course, we do not want to throw away all results and just try again, but only want to resolve the rows in conflict.
So I wonder what is a good approach to (1) store all Result.a
and Result.b
that are not in conflict and (2) get a List
of Result
s that are in conflict and need special treatment.
public List<Result> store(Stream<Result> results) {
// store all a
// store all b (ideally without using results * 2 RAM)
// do update other stuff if a and b are not in conflict and do it in the same ACID transaction as the update of the related a and b.
// return those in Conflict
}
How can I implement it without unpacking each result, sending it to the db in its own transaction etc? Ideally, I need send all at once to the DB and get a list of conflicts that have not been stored (and the other should have been persisted). I am open to a different approach as well.
We use JPA/Hibernate if that is relevant.
In order to use optimistic locking, we need to have an entity including a property with @Version annotation. While using it, each transaction that reads data holds the value of the version property. Before the transaction wants to make an update, it checks the version property again.
In most scenarios, optimistic concurrency control is more efficient and offers higher performance. When choosing between pessimistic and optimistic locking, consider the following: Pessimistic locking is useful if there are a lot of updates and relatively high chances of users trying to update data at the same time.
The OPTIMISTIC LOCKING way is: Note that the key point is in the structure of the UPDATE instruction and the subsequent number of affected rows check. It is these two things together that let your code realize that someone has already modified the data in between when you have executed the SELECT and UPDATE.
The easiest would be to streamline the persistence into a FIFO Queue (lots of technics exist, but in general that would become in a way of "single entry per transaction", that is not desired approach).
So for the second option, I would move the logic of concurrency conflict definition out of database persist action to a separate service.
You can implement something like an in-memory map of the UserId-to-Reentrant locks (those operations are really fast, comparing to synchronized blocks).
During the first call to persist a lock becomes locked; after successful persist the lock is released. In the meantime (in a separate thread) you can check the state of the lock, and either filter out by that, or wait until the lock is released. Be careful with wait state: you have streams, so the whole thread processing a stream will enter waiting state.
Personally, I would stick to a first one "single entry per transaction" with some (persistable) messaging queue in the middle, with separate service for locking check. First of all, this would allow us to easily configure concurrency of write operations; and second easily use wait state in a writer, because only one entry will be locked.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With