Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Managing dynamic number of threads

First of all, I'm still familiarizing myself with multi-threading, and don't know much terminology. I need to make sure I'm doing this right, because it's a sensitive subject.

Specifications

What I'm building is a component which will contain a dynamic number of threads. Each of these threads is re-used for performing a number of requests. I can provide all necessary details to the thread as I create it and before I execute it, as well as provide event handlers. Once it's executed, I'm pretty much done with one request, and I feed in another request. The requests are being fed into these threads from another stand-alone background thread which is constantly processing a queue of requests. So this system has two lists: 1) List of request records, and 2) List of thread pointers.

I'm using descendants of the TThread class (at least this is the threading method I'm familiar with). I'm getting feedback from the threads by synchronizing event triggers which I assigned when the threads were created. The threads are loading and saving data in the background, and when they're done, they reset themselves ready to process the next request.

Problem

Now the trouble begins when deciding how to handle the event of changing the number of allowed threads (via a property of the component ActiveThreads: TActiveThreadRange which TActiveThreadRange = 1..20). Therefore, there can be anywhere between 1 and 20 threads created at a time. But when, let's say, the application using this component changes this property from 5 to 3. At this time, there are already 5 threads created, and I don't want to forcefully free that thread if it happens to be busy. I need to wait until it's done before I free it. And on the other hand, if the property is changed from 3 to 5, then I need to create 2 new threads. I need to know the proper approach to 'keep track' of these threads in this scenario.

Possibilities

Here are some possible ways I can think of to 'track' these threads...

  • Keep a TList containing each created thread - easy to manage
  • Create a TList wrapper or descendant containing each created thread - easier to manage, but more work
  • Keep an array containing each created thread - Would this be better than a TList?
  • Create an array wrapper containing each created thread

But then back to my original issue - What to do with existing busy threads when the ActiveThreads property is decreased? Creating them is no problem, but releasing them is becoming confusing. I usually make threads which free themselves, but this is the first time I've made one which is re-used. I just need to know the proper method of destroying these threads without interrupting their tasks.

Update

Based on the feedback, I have acquired and begun implementing the OmniThreadLibrary (as well as the long needed FastMM). I've also changed my approach a little - A way that I can create these threaded processes without managing them and without another thread to process the queue...

  • 1 master method to spawn a new process
    • function NewProcess(const Request: TProcessRequest): TProcessInfo;
    • TProcessRequest is a record with specifications of what's to be done (Filename, Options, etc.)
    • TProcessInfo is a record which passes back some status information.
  • Feed in an event handler for the event of being 'done' with its task when creating a new process. When component receives this message, it will check the queue.
    • If command is queued, it will compare the active process limit with current process count
    • > If exceeds limit, just stop and next completed process will do perform same check
    • > If within limit, kick off another new process (after ensuring previous process is done)
    • If no commands are queued, then just stop
  • Each process can die on its own after it has done its task (no keep-alive threading)
  • I won't have to worry about another timer or thread to continually loop through
    • Instead each process destroys its self and checks for new requests before doing so

Another Update

I have actually reverted back to using a TThread, as the OTL is very uncomfortable to use. I like to keep things wrapped and organized in its own class.

like image 515
Jerry Dodge Avatar asked Jan 26 '12 20:01

Jerry Dodge


2 Answers

As explained by @NGLN etc, you need to pool some threads and accept that the easiest way to manage thread numbers is to divorce the actual number of threads from the desired number. Adding threads to the pool is easy - just create some more instances, (passing the producer-consumer task input queue as a parameter so that the thread knows what to wait on). If the desired number of threads is less than that currently existing, you could queue up enough 'poison-pills' to kill off the extra threads.

Don't keep any list of thread pointers - it's a load of micro-management hassle that's just not necessary, (and will probably go wrong). All you need to keep is a count of the number of desired threads in the pool so you know what action to take when something changes the 'poolDepth' property.

The event triggers are best loaded into the jobs that are issued to the pool - descend them all from some 'TpooledTask' class that takes an event as a constructor parameter and stores it in some 'FonComplete' TNotifyEvent. The thread that runs the task can call the FonComplete when it's done the job, (with the TpooledTask as the sender parameter) - you don't need to know what thread ran the task.

Example:

    unit ThreadPool;

    interface

    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, contnrs, syncobjs;


    type

    TpooledTask=class(TObject)
    private
      FonComplete:TNotifyEvent;
    protected
      Fparam:TObject;
      procedure execute; virtual; abstract;
    public
      constructor create(onComplete:TNotifyEvent;param:TObject);
    end;

    TThreadPool=class(TObjectQueue)
    private
      access:TcriticalSection;
      taskCounter:THandle;
      threadCount:integer;
    public
      constructor create(initThreads:integer);
      procedure addTask(aTask:TpooledTask);
    end;

    TpoolThread=class(Tthread)
    private
      FmyPool:TThreadPool;
    protected
      procedure Execute; override;
    public
      constructor create(pool:TThreadPool);
    end;

    implementation

    { TpooledTask }

    constructor TpooledTask.create(onComplete: TNotifyEvent; param: TObject);
    begin
      FonComplete:=onComplete;
      Fparam:=param;
    end;

    { TThreadPool }

    procedure TThreadPool.addTask(aTask: TpooledTask);
    begin
      access.acquire;
      try
        push(aTask);
      finally
        access.release;
      end;
      releaseSemaphore(taskCounter,1,nil); // release one unit to semaphore
    end;

    constructor TThreadPool.create(initThreads: integer);
    begin
      inherited create;
      access:=TcriticalSection.create;
      taskCounter:=createSemaphore(nil,0,maxInt,'');
      while(threadCount<initThreads) do
      begin
        TpoolThread.create(self);
        inc(threadCount);
      end;
    end;

    { TpoolThread }

    constructor TpoolThread.create(pool: TThreadPool);
    begin
      inherited create(true);
      FmyPool:=pool;
      FreeOnTerminate:=true;
      resume;
    end;

procedure TpoolThread.execute;
var thisTask:TpooledTask;
begin
  while (WAIT_OBJECT_0=waitForSingleObject(FmyPool.taskCounter,INFINITE)) do
  begin
    FmyPool.access.acquire;
    try
      thisTask:=TpooledTask(FmyPool.pop);
    finally
      FmyPool.access.release;
    end;
    thisTask.execute;
    if assigned(thisTask.FonComplete) then thisTask.FonComplete(thisTask);
  end;
end;

end.
like image 57
Martin James Avatar answered Oct 30 '22 07:10

Martin James


You can implement FreeNotify Message in your request queue and when Worker Thread receieve this message free themselves. In your example when you decrease number of threads from 5 to 3 just put 2 FreeNotify messages in your queue and 2 worker threads will be free.

like image 29
Tomislav Avramovic Avatar answered Oct 30 '22 08:10

Tomislav Avramovic