Trying to use the TThreadedQueue (Generics.Collections) in a single producer multiple consumer scheme. (Delphi-XE). The idea is to push objects into a queue and let several worker threads draining the queue.
It does not work as expected, though. When two or more worker threads are calling PopItem, access violations are thrown from the TThreadedQueue.
If the call to PopItem is serialized with a critical section, all is fine.
Surely the TThreadedQueue should be able to handle multiple consumers, so am I missing something or is this a pure bug in TThreadedQueue ?
Here is a simple example to produce the error.
program TestThreadedQueue;
{$APPTYPE CONSOLE}
uses
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
Windows,
Messages,
Classes,
SysUtils,
SyncObjs,
Generics.Collections;
type TThreadTaskMsg =
class(TObject)
private
threadID : integer;
threadMsg : string;
public
Constructor Create( ID : integer; const msg : string);
end;
type TThreadReader =
class(TThread)
private
fPopQueue : TThreadedQueue<TObject>;
fSync : TCriticalSection;
fMsg : TThreadTaskMsg;
fException : Exception;
procedure DoSync;
procedure DoHandleException;
public
Constructor Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
procedure Execute; override;
end;
Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
sync : TCriticalSection);
begin
fPopQueue:= popQueue;
fMsg:= nil;
fSync:= sync;
Self.FreeOnTerminate:= FALSE;
fException:= nil;
Inherited Create( FALSE);
end;
procedure TThreadReader.DoSync ;
begin
WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;
procedure TThreadReader.DoHandleException;
begin
WriteLn('Exception ->' + fException.Message);
end;
procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
NameThreadForDebugging('QueuePop worker');
while not Terminated do
begin
try
{- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
Sleep(20);
{- Serializing calls to PopItem works }
if Assigned(fSync) then fSync.Enter;
try
signal:= fPopQueue.PopItem( TObject(fMsg));
finally
if Assigned(fSync) then fSync.Release;
end;
if (signal = wrSignaled) then
begin
try
if Assigned(fMsg) then
begin
fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
fMsg.Free; // We are just dumping the message in this test
//Synchronize( Self.DoSync);
//PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
end;
except
on E:Exception do begin
end;
end;
end;
except
FException:= Exception(ExceptObject);
try
if not (FException is EAbort) then
begin
{Synchronize(} DoHandleException; //);
end;
finally
FException:= nil;
end;
end;
end;
end;
Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
Inherited Create;
threadID:= ID;
threadMsg:= msg;
end;
var
fSync : TCriticalSection;
fThreadQueue : TThreadedQueue<TObject>;
fReaderArr : array[1..4] of TThreadReader;
i : integer;
begin
try
IsMultiThread:= TRUE;
fSync:= TCriticalSection.Create;
fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
try
{- Calling without fSync throws exceptions when two or more threads calls PopItem
at the same time }
WriteLn('Creating worker threads ...');
for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
{- Calling with fSync works ! }
//for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
WriteLn('Init done. Pushing items ...');
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
ReadLn;
finally
for i:= 1 to 4 do fReaderArr[i].Free;
fThreadQueue.Free;
fSync.Free;
end;
except
on E: Exception do
begin
Writeln(E.ClassName, ': ', E.Message);
ReadLn;
end;
end;
end.
Update : The error in TMonitor that caused TThreadedQueue to crash is fixed in Delphi XE2.
Update 2 : The above test stressed the queue in the empty state. Darian Miller found that stressing the queue at full state, still could reproduce the error in XE2. The error once again is in the TMonitor. See his answer below for more information. And also a link to the QC101114.
Update 3 :
With Delphi-XE2 update 4 there was an announced fix for TMonitor
that would cure the problems in TThreadedQueue
. My tests so far are not able to reproduce any errors in TThreadedQueue
anymore.
Tested single producer/multiple consumer threads when queue is empty and full.
Also tested multiple producers/multiple consumers. I varied the reader threads and writer threads from 1 to 100 without any glitch. But knowing the history, I dare others to break TMonitor
.
Well, it's hard to be sure without a lot of testing, but it certainly looks like this is a bug, either in TThreadedQueue or in TMonitor. Either way it's in the RTL and not your code. You ought to file this as a QC report and use your example above as the "how to reproduce" code.
I recommend you to use OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary when working with threads, parallelism, etc. Primoz made a very good job, and on the site you'll find a lot of useful documentation.
Your example seems to work fine under XE2, but if we fill your queue it fails with AV on a PushItem. (Tested under XE2 Update1)
To reproduce, just increase your task creation from 100 to 1100 (your queue depth was set at 1024)
for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
This dies for me every time on Windows 7. I initially tried a continual push to stress test it, and it failed at loop 30...then at loop 16...then at 65 so at different intervals but it consistently failed at some point.
iLoop := 0;
while iLoop < 1000 do
begin
Inc(iLoop);
WriteLn('Loop: ' + IntToStr(iLoop));
for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
end;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With