Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multithreaded Synchronised List<T>

I have a requirement whereby I needed to store a simple cache of a list of items. I was using List< T > for this purpose, but we have now changed the design to accommodate multiple threads.

The architecture of the system is driven by events, therefore it's quite likely that a read and write operation could collide. Since the vast majority of access will be read-only I thought that the ReaderWriterLockSlim might be a good candidate. The cache only needs to be accurate at the point of reading for that moment in time.

I have written the code below and it seems to work ok, but are there some potential pain points?

UPDATE: Whilst the code below does fix some synchronisation problems it's not 100% perfect. I have since decided to implement a much simpler class that doesn't expose all of the IList< T > operations and therefore makes it 'safer' to re-use.

public class SynchronisedList<T> : IList<T>
{
    private ReaderWriterLockSlim cacheLock = new ReaderWriterLockSlim();
    private IList<T> innerCache = new List<T>();

    private U ReadReturn<U>(Func<U> function)
    {
        cacheLock.EnterReadLock();
        try { return function(); }
        finally { cacheLock.ExitReadLock(); }
    }

    private void Read(Action action)
    {
        cacheLock.EnterReadLock();
        try { action(); }
        finally { cacheLock.ExitReadLock(); }
    }

    private U WriteReturn<U>(Func<U> function)
    {
        cacheLock.EnterWriteLock();
        try { return function(); }
        finally { cacheLock.ExitWriteLock(); }
    }

    private void Write(Action action)
    {
        cacheLock.EnterWriteLock();
        try { action(); }
        finally { cacheLock.ExitWriteLock(); }
    }

    public T this[int index]
    {
        get { return ReadReturn(() => innerCache[index]); }
        set { Write(() => innerCache[index] = value); }
    }

    public int IndexOf(T item) { return ReadReturn(() => innerCache.IndexOf(item)); }
    public void Insert(int index, T item) { Write(() => innerCache.Insert(index, item)); }
    public void RemoveAt(int index) { Write(() => innerCache.RemoveAt(index)); }
    public void Add(T item) { Write(() => innerCache.Add(item)); }
    public void Clear() { Write(() => innerCache.Clear()); }
    public bool Contains(T item) { return ReadReturn(() => innerCache.Contains(item)); }
    public int Count { get { return ReadReturn(() => innerCache.Count); } }
    public bool IsReadOnly { get { return ReadReturn(() => innerCache.IsReadOnly); } }
    public void CopyTo(T[] array, int arrayIndex) { Read(() => innerCache.CopyTo(array, arrayIndex)); }
    public bool Remove(T item) { return WriteReturn(() => innerCache.Remove(item)); }
    public IEnumerator<T> GetEnumerator() { return ReadReturn(() => innerCache.GetEnumerator()); }
    IEnumerator IEnumerable.GetEnumerator() { return ReadReturn(() => (innerCache as IEnumerable).GetEnumerator()); }
}

internal class Program
{
    private static SynchronisedList<int> list = new SynchronisedList<int>();

    private static void Main()
    {          
        for (int i = 0; i < 500000; i++)
        {
            var index = i;
            ThreadPool.QueueUserWorkItem((state) =>
            {
                var threadNum = (int)state;
                if (threadNum % 10 == 0)
                {
                    list.Add(threadNum);
                }
                else
                {
                    Console.WriteLine(list.Count);
                }
            }, index);
        }
        Console.ReadKey();
    }
}
like image 357
Codebrain Avatar asked Oct 15 '09 10:10

Codebrain


People also ask

What is synchronized in multithreading?

Synchronization in java is the capability to control the access of multiple threads to any shared resource. In the Multithreading concept, multiple threads try to access the shared resources at a time to produce inconsistent results. The synchronization is necessary for reliable communication between threads.

Is ArrayList multithreaded?

We know that by default ArrayList class is not a thread-safe or non-synchronized. That means the multiple threads can access the same ArrayList object or instance simultaneously. Therefore, it cannot be used in the multi-threading environment without explicit synchronization.

Is synchronized list thread-safe?

synchronizedList(List <T>) method accepts the ArrayList as an argument and returns a thread safe list.

What are synchronized threads?

Thread synchronization is the concurrent execution of two or more threads that share critical resources. Threads should be synchronized to avoid critical resource use conflicts. Otherwise, conflicts may arise when parallel-running threads attempt to modify a common variable at the same time.


3 Answers

Are you aware of the built-in SynchronizedCollection<T> class?

It uses standard Monitor-based locking rather than ReaderWriterLockSlim. You'd need to profile to determine whether this makes a significant performance difference in your particular usage scenarios.

like image 111
LukeH Avatar answered Sep 20 '22 22:09

LukeH


There are a couple of thread issues here.

1. I think the GetEnumerator functions exposes a thread issue here. They give away a reference to the innerCache that is not controlled by your locks.

Example where it may break down is if you have a thread doing a foreach over the list while another thread is removing or inserting elements.

The solution would be to copy the list and return an enumerator on that newly cloned list instead. The draw back would be memory issues if the list is long.

2. The Contains() and IndexOf() functions are more or less useless unless you have another locking method outside of the synchronised list.

Example: Thread A gets index of object, Thread B inserts/removed/updates that object, Thread A index is now stale.


I don't think this is a great idea really with a fully synchronised list. Write a customised version instead with limited functionality.

If you only need a queue or stack, implement that one with only the two or three necessary methods that are fully synchronised. If you need more functionality than that, use a List and have the different threads do the synchronisation.

like image 33
Mats Fredriksson Avatar answered Sep 19 '22 22:09

Mats Fredriksson


This class solves all problems and makes your list 100% thread-safe.

Race conditions are avoided by using scopes that work just like transactions in databases.

Client code

List<T> unsafeList = ... 
var threadSafeList = new SyncronisedList(unsafeList);
using (threadSafeList.EnterReadScope()) {
   // all your sequential read operations are thread-safe
}
using (threadSafeList.EnterWriteScope()) {
   // all sequential read/write operations are thread-safe
}

Class code

public class SyncronisedList<T> : IList<T> {
    private readonly ReaderWriterLockSlim _threadLock;
    private readonly IList<T> _internalList;

    public SyncronisedList() : this(new List<T>()) {
    }

    public SyncronisedList(IList<T> internalList) {
        _internalList = internalList;
        _threadLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    }


    private U Read<U>(Func<U> function) {
        using (EnterReadScope())
            return function();
    }

    private void Read(Action action) {
        using (EnterReadScope())
            action();
    }

    private U Write<U>(Func<U> function) {
        using (EnterWriteScope())
            return function();
    }

    private void Write(Action action) {
        using (EnterWriteScope())
            action();
    }

    public IDisposable EnterReadScope() {
        return new Scope<T>(this, false);
    }

    public IDisposable EnterWriteScope() {
        return new Scope<T>(this, true);
    }

    public T this[int index] {
        get { return Read(() => _internalList[index]); }
        set { Write(() => _internalList[index] = value); }
    }

    public int IndexOf(T item) { return Read(() => _internalList.IndexOf(item)); }
    public void Insert(int index, T item) { Write(() => _internalList.Insert(index, item)); }
    public void RemoveAt(int index) { Write(() => _internalList.RemoveAt(index)); }
    public void Add(T item) { Write(() => _internalList.Add(item)); }
    public void Clear() { Write(() => _internalList.Clear()); }
    public bool Contains(T item) { return Read(() => _internalList.Contains(item)); }
    public int Count { get { return Read(() => _internalList.Count); } }
    public bool IsReadOnly { get { return Read(() => _internalList.IsReadOnly); } }
    public void CopyTo(T[] array, int arrayIndex) { Read(() => _internalList.CopyTo(array, arrayIndex)); }
    public bool Remove(T item) { return Write(() => _internalList.Remove(item)); }
    public IEnumerator<T> GetEnumerator() { return Read(() => _internalList.GetEnumerator()); }
    IEnumerator IEnumerable.GetEnumerator() { return Read(() => (_internalList as IEnumerable).GetEnumerator()); }


    private class Scope<U> : IDisposable {
        private readonly SyncronisedList<U> _owner;
        private readonly bool _write;

        internal Scope(SyncronisedList<U> owner, bool write) {
            _owner = owner;
            _write = write;
            if (_write)
                _owner._threadLock.EnterWriteLock();
            else
                _owner._threadLock.EnterReadLock();
        }

        public void Dispose() {
            if (_write)
                _owner._threadLock.ExitWriteLock();
            else
                _owner._threadLock.ExitReadLock();
        }

    }

}
like image 36
Herman Schoenfeld Avatar answered Sep 17 '22 22:09

Herman Schoenfeld