Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unexpected data loss during static Queue.Enqueue in multithread process

private Queue<FrameObjectElement> _queueObject = new
Queue<FrameObjectElement>();

private static Queue<FrameObjectElement> _queueItem = new
Queue<FrameObjectElement>();

private static int permitEntryCount = 0;

private int allowThreadEntry = 0;

I have 2 queue variables as shown above.

public Camera(IVideoSource source, MotionDetector detector)
{
    VideoSource = source; 
    _motionDetector = detector;
    VideoSource.NewFrame += VideoNewFrame; 
    MainForm.OnRegisterClickedEvent += new MainForm.RegisterClickedEventHandler(MainForm_OnRegisterClickedEvent);
    MainForm.OnReceiveMultipleFrameEvent += new MainForm.ReceiveMultipleFrameEventHandler(MainForm_OnReceiveMultipleFrameEvent);
}

I have a Camera class, and shown above is part of the constructor implementation. The video source always listening to event VideoNewFrame; The code I show below is one segment of the code in VideoNewFrame.

FrameObjectElement frameObj = new FrameObjectElement();
frameObj.cameraID = CW.Camobject.id;
frameObj.cameraTag = _FPGAFrameCount / 2;
frameObj.FirstFrameBuffer = BitmapToRgbValues(twoframe_arg.GetFirstBitmap(352, 288));
frameObj.SecondFrameBuffer = BitmapToRgbValues(twoframe_arg.GetSecondBitmap(352, 288));

if (_queueObject.Count > 5)
    _queueObject.Clear();

_queueObject.Enqueue(frameObj);

if (allowThreadEntry == permitEntryCount && isClear) 
{
    if (_queueObject.Count !=  0)
    {
        lock (this)
        {
            _queueItem.Enqueue(_queueObject.Peek());
        }
        Debug.WriteLine("Thread ID: " + Thread.CurrentThread.ManagedThreadId.ToString() +
        " - " + _queueObject.Count.ToString() +
        " queue in QueueObject : Camera ID : " + _queueObject.Peek().cameraID.ToString() +
        " : Camera Tag : " + _queueObject.Peek().cameraTag.ToString() + 
        " : Queue item count : " + _queueItem.Count.ToString());

        _queueObject.Dequeue();

        if (_queueItem.Count == numberOfCamera && isAllow)
        {
            CurrentID = CW.Camobject.id;
            isAllow = false;
        }

        allowThreadEntry++;
        if (_queueItem.Count == numberOfCamera)
        {
            if (CurrentID == CW.Camobject.id)
            {
                isClear = false;
                //allowEntry = false;

                //Debug.WriteLine("-- Count: " + allowThreadEntry.ToString() + " --");

                foreach (FrameObjectElement foE in _queueItem)
                {
                    Debug.WriteLine("Current Camera ID: " + CW.Camobject.id +
                        " : Camera ID : " + foE.cameraID.ToString() +
                        " : Camera Tag : " + foE.cameraTag.ToString() + " :");
                }

                MultipleFrameEventArgs newMul = new MultipleFrameEventArgs();
                newMul.itemObj = _queueItem;

                if (OnMultupleFrameEvent != null)
                    OnMultupleFrameEvent(newMul);

                _queueItem.Clear();
                isAllow = true;
                isClear = true;
                Debug.WriteLine("Queue item count: " + _queueItem.Count.ToString() +
                    " : isClear : " + isClear.ToString());
            }
        }   
    }
}

Basically what am I trying to achieve here is to collect the frame id, tag, its first and second frame and then store in an object(struct FrameObjectElement). Every collection of 2 frames will represent 1 camera tag, thus the meaning of its role here. Then the frameobject is enqueued in _queueObject. Next I would have a condition if(allowThreadEntry == permitEntryCount). So what is being done here is every time this function is accessed, allowThreadEntry will increment while permitCountEntry remains the same. Then this function will go on and enqueue the first element in _queueObject to _queueItem and once a desired count of _queueItem is satisfied, it would raise a signal to another Class. This class will respond back by raising a signal which Camera class has earlier subscribed as shown MainForm.OnReceiveMultipleFrameEvent += new MainForm.ReceiveMultipleFrameEvent(MainForm_OnReceiveMultipleFrameEvent).

void
MainForm_OnReceiveMultipleFrameEvent(MainForm.ReceiveMultpleFrameEventArgs
e)  {       permitEntryCount++;     }

Upon receiving the signal, permitEntryCount will be incremented thus allowing the function to be accessed again. Why I do this is because this class is created depends on how many object camera I have. If I have 11 cameras, I would have 11 workerThread running processing this class. I queue up their frames in a non static queue and gather their first element into a static queue which is to be passed for my other process. The problem I am facing here is as below:

=============================  Count : 1760 ===============================
Queue item count: 0 : isClear : True
Thread ID: 17 - 3 queue in QueueObject : Camera ID : 3 : Camera Tag :
3372 : Queue item count : 1
Thread ID: 24 - 6 queue in QueueObject : Camera ID :10 : Camera Tag :
4367 : Queue item count : 2
Thread ID: 23 - 5 queue in QueueObject : Camera ID : 9 : Camera Tag :
4415 : Queue item count : 3
Thread ID: 19 - 1 queue in QueueObject : Camera ID : 5 : Camera Tag :
4108 : Queue item count : 4
Thread ID: 20 - 5 queue in QueueObject : Camera ID : 6 : Camera Tag :
3768 : Queue item count : 5
Thread ID: 14 - 1 queue in QueueObject : Camera ID : 0 : Camera Tag :
2837 : Queue item count : 6
Thread ID: 21 - 1 queue in QueueObject : Camera ID : 7 : Camera Tag :
3246 : Queue item count : 7
Thread ID: 16 - 1 queue in QueueObject : Camera ID : 2 : Camera Tag :
3552 : Queue item count : 8
Thread ID: 18 - 6 queue in QueueObject : Camera ID : 4 : Camera Tag :
3117 : Queue item count : 9
Thread ID: 15 - 3 queue in QueueObject : Camera ID :1 : Camera Tag :
2315 : Queue item count : 10
Thread ID: 22 - 4 queue in QueueObject : Camera ID :8 : Camera Tag :
4853 : Queue item count : 11
Current Camera ID: 8 : Camera ID : 3 : Camera Tag : 3372 :
Current Camera ID: 8 : Camera ID :10 : Camera Tag : 4367 :
Current Camera ID: 8 : Camera ID : 9 : Camera Tag : 4415 :
Current Camera ID: 8 : Camera ID : 5 : Camera Tag : 4108 :
Current Camera ID: 8 : Camera ID : 6 : Camera Tag : 3768 :
Current Camera ID: 8 : Camera ID : 0 : Camera Tag : 2837 :
Current Camera ID: 8 : Camera ID : 7 : Camera Tag : 3246 :
Current Camera ID: 8 : Camera ID : 2 : Camera Tag : 3552 :
Current Camera ID: 8 : Camera ID : 4 : Camera Tag : 3117 :
Current Camera ID: 8 : Camera ID : 1 : Camera Tag : 2315 :
Current Camera ID: 8 : Camera ID : 8 : Camera Tag : 4853 :
=============================  Count : 1761 ===============================
Queue item count: 0 : isClear : True
Thread ID: 14 - 1 queue in QueueObject : Camera ID : 0 : Camera Tag :
2838 : Queue item count : 1
Thread ID: 16 - 1 queue in QueueObject : Camera ID : 2 : Camera Tag :
3553 : Queue item count : 2
Thread ID: 21 - 1 queue in QueueObject : Camera ID : 7 : Camera Tag :
3247 : Queue item count : 3
Thread ID: 24 - 1 queue in QueueObject : Camera ID :10 : Camera Tag :
4374 : Queue item count : 4
Thread ID: 23 - 6 queue in QueueObject : Camera ID : 9 : Camera Tag :
4416 : Queue item count : 5
Thread ID: 17 - 4 queue in QueueObject : Camera ID : 3 : Camera Tag :
3373 : Queue item count : 7
Thread ID: 15 - 3 queue in QueueObject : Camera ID : 1 : Camera Tag :
2316 : Queue item count : 7
Thread ID: 18 - 6 queue in QueueObject : Camera ID : 4 : Camera Tag :
3118 : Queue item count : 8
Thread ID: 20 - 6 queue in QueueObject : Camera ID : 6 : Camera Tag :
3769 : Queue item count : 9
Thread ID: 22 - 4 queue in QueueObject : Camera ID :8 : Camera Tag :
4854 : Queue item count : 10

I should have different count number in _queueItem since every object created can access only one time in this segment thus letting me know that their element will be enqueued into _queueItem. But unfortunately, somehow after the program runs a while, there will be an occurrence as shown above. Either I apply lock or not on this part _queueItem.Enqueue(_queueObject.Peek()); I will still have the problem. May I know where I did wrong?

like image 950
cy.lee Avatar asked Feb 18 '23 15:02

cy.lee


1 Answers

You say the queue is static, but you've locked against an instance:

lock (this)
{
    _queueItem.Enqueue(_queueObject.Peek());
}

If you have multiple instances, that means each are locking independently. A better approach would be to have a dedicated static lock-object, and lock against that. You might be able to cheat with:

lock (_queueItem)
{
    _queueItem.Enqueue(_queueObject.Peek());
}

if _queueItem is never reassigned, but the safest approach is:

static readonly object lockObj = new object();
lock (lockObj)
{
    _queueItem.Enqueue(_queueObject.Peek());
}

Note that all access to the queue must be synchronized, and must all use the same lock-object.

You might be able to reduce a bit of contention if you talk to the two queues separately, but try to avoid using nested locks in this scenario, as that can cause deadlocks if done badly; for example, to peek from the instance queue and enqueue into the static queue, you could use:

object item;
lock(instanceLock) {
    item = _queueObject.Peek();
}
lock(staticLock) {
    _queueItem.Enqueue(item);
}

Note also that even things as simple as .Count need to be synchronized, and ideally double-checked (you can't check a count early in the method, and then later on assuming there is still something to dequeue, unless you keep the lock for that entire duration). Your code makes repeated use of .Count - so please be very careful about that. .Count is transient: as soon as you've read it, if you relinquish the lock, you must assume that it is already wrong.

like image 162
Marc Gravell Avatar answered Apr 28 '23 05:04

Marc Gravell