I need an advice on the data structure to use as an atomic change log.
I'm trying to implement the following algorithm. There is a flow of incoming changes updating an in-memory map. In Haskell-like pseudocode it is
update :: DataSet -> SomeListOf Change -> Change -> STM (DataSet, SomeListOf Change)
update dataSet existingChanges newChange = do
...
return (dataSet, existingChanges ++ [newChange])
where DataSet is a map (currently it is the Map from the stm-containers package, https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Map.html). The whole "update" is called from arbitrary number of threads. Some of the Change's can be rejected due to domain semantics, I use throwSTM for that to throw away the effect of the transaction. In case of successful commit the "newChange" is added to the list.
There exists separate thread which calls the following function:
flush :: STM (DataSet, SomeListOf Change) -> IO ()
this function is supposed to take the current snapshot of DataSet together with the list of changes (it has to a consistent pair) and flush it to the filesystem, i.e.
flush data = do
(dataSet, changes) <- atomically $ readTVar data_
-- write them both to FS
-- ...
atomically $ writeTVar data_ (dataSet, [])
I need an advice about the data structure to use for "SomeListOf Change". I don't want to use [Change] because it is "too ordered" and I'm afraid there will be too many conflicts, which will force the whole transaction to retry. Please correct me, if I'm wrong here.
I cannot use the Set (https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Set.html) because I still need to preserve some order, e.g. the order of transaction commits. I could use TChan for it and it looks like a good match (exactly the order of transaction commits), but I don't know how to implement the "flush" function so that it would give the consistent view of the whole change log together with the DataSet.
The current implementation of that is here https://github.com/lolepezy/rpki-pub-server/blob/add-storage/src/RRDP/Repo.hs, in the functions applyActionsToState and rrdpSyncThread, respectively. It uses TChan and seems to do it in a wrong way.
Thank you in advance.
Update: A reasonable answer seems to be like that
type SomeListOf c = TChan [c]
update :: DataSet -> TChan [Change] -> Change -> STM DataSet
update dataSet existingChanges newChange = do
...
writeTChan changeChan $ reverse (newChange : existingChanges)
return dataSet
flush data_ = do
(dataSet, changes) <- atomically $ (,) <$> readTVar data_ <*> readTChan changeChan
-- write them both to FS
-- ...
But I'm still not sure whether it's a neat solution to pass the whole list as an element of the channel.
I'd probably just go with the list and see how far it takes performance-wise. Given that, you should consider that both, appending to the end of a list and reversing it are O(n) operations, so you should try to avoid this. Maybe you can just prepend the incoming changes like this:
update dataSet existingChanges newChange = do
-- ...
return (dataSet, newChange : existingChanges)
Also, your example for flush has the problem that reading and updating the state is not atomic at all. You must accomplish this using a single atomically
call like so:
flush data = do
(dataSet, changes) <- atomically $ do
result <- readTVar data_
writeTVar data_ (dataSet, [])
return result
-- write them both to FS
-- ...
You could then just write them out in reverse order (because now changes
contains the elements from newest to oldest) or reverse here once if it's important to write them out oldest to newest. If that's important I'd probably go with some data structure which allows O(1) element access like a good old vector.
When using a fixed-size vector you would obviously have to deal with the problem that it can become "full" which would mean your writers would have to wait for flush
to do it's job before adding fresh changes. That's why I'd personally go for the simple list first and see if it's sufficient or where it needs to be improved.
PS: A dequeue might be a good fit for your problem as well, but going fixed size forces you to deal with the problem that your writers can potentially produce more changes than your reader can flush out. The dequeue can grow infinitely, but you your RAM probably isn't. And the vector has pretty low overhead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With