管理动态线程数

发布于 2024-12-29 08:18:22 字数 1960 浏览 2 评论 0原文

首先,我对多线程还很熟悉,不懂太多术语。我需要确保我做得正确,因为这是一个敏感话题。

规格

我正在构建的是一个包含动态数量线程的组件。这些线程中的每一个都被重新用于执行多个请求。我可以在创建线程时和执行线程之前向其提供所有必要的详细信息,并提供事件处理程序。一旦执行,我就基本完成了一个请求,然后输入另一个请求。请求从另一个独立的后台线程输入到这些线程中,该线程不断处理请求队列。所以这个系统有两个列表:1)请求记录列表,2)线程指针列表。

我正在使用 TThread 类的后代(至少这是我熟悉的线程方法)。我通过同步创建线程时分配的事件触发器来从线程获取反馈。线程在后台加载和保存数据,完成后,它们会重置自己以准备处理下一个请求。

问题

现在,当决定如何处理更改允许线程数的事件时,麻烦就开始了(通过组件 ActiveThreads: TActiveThreadRange 的属性,其中 TActiveThreadRange代码> = 1..20)。因此,一次可以创建 1 到 20 个线程。但是,假设使用此组件的应用程序将此属性从 5 更改为 3。此时,已经创建了 5 个线程,如果该线程恰好很忙,我不想强​​制释放该线程。我需要等到它完成后再释放它。另一方面,如果属性从 3 更改为 5,那么我需要创建 2 个新线程。我需要知道在这种情况下“跟踪”这些线程的正确方法。

可能性

以下是我能想到的一些可能的方法来“跟踪”这些线程...

  • 保留一个包含每个创建的线程的 TList - 易于管理
  • 创建一个 TList 包含每个创建的线程的包装器或后代 - 更易于管理,但工作量更大
  • 保留包含每个创建的线程的 array - 这会比 TList 更好吗?
  • 创建一个包含每个创建的线程的数组包装器

但是回到我原来的问题 - 当 ActiveThreads 属性减少时如何处理现有的繁忙线程?创建它们没有问题,但释放它们却变得令人困惑。我通常制作可以自行释放的线程,但这是我第一次制作可以重复使用的线程。我只需要知道销毁这些线程而不中断其任务的正确方法。

更新

根据反馈,我已经获取并开始实现 OmniThreadLibrary(以及长期需要的 FastMM)。我还稍微改变了我的方法 - 一种可以创建这些线程进程而无需管理它们并且无需另一个线程来处理队列的方法...

  • 1 个生成新进程的主方法
    • function NewProcess(const Request: TProcessRequest): TProcessInfo;
    • TProcessRequest 是一条记录,其中包含要执行的操作的规范(文件名、选项等)
    • TProcessInfo 是传回一些状态信息的记录。
  • 创建新流程时,输入任务“完成”事件的事件处理程序。当组件收到此消息时,它将检查队列。
    • 如果命令已排队,它将比较活动进程限制与当前进程计数
    • >如果超出限制,则停止,下一个完成的进程将执行相同的检查
    • >如果在限制范围内,则启动另一个新流程(在确保完成前一个流程后)
    • 如果没有命令排队,则停止
  • 每个进程在完成其任务后都会自行死亡(没有保持活动线程)
  • 我不会必须担心另一个计时器或线程不断循环
    • 相反,每个进程都会销毁自己并在执行此操作之前检查新请求

另一个更新

我实际上已恢复使用TThread,因为 OTL 非常不舒服使用。我喜欢将事物包装并组织在自己的类中。

First of all, I'm still familiarizing myself with multi-threading, and don't know much terminology. I need to make sure I'm doing this right, because it's a sensitive subject.

Specifications

What I'm building is a component which will contain a dynamic number of threads. Each of these threads is re-used for performing a number of requests. I can provide all necessary details to the thread as I create it and before I execute it, as well as provide event handlers. Once it's executed, I'm pretty much done with one request, and I feed in another request. The requests are being fed into these threads from another stand-alone background thread which is constantly processing a queue of requests. So this system has two lists: 1) List of request records, and 2) List of thread pointers.

I'm using descendants of the TThread class (at least this is the threading method I'm familiar with). I'm getting feedback from the threads by synchronizing event triggers which I assigned when the threads were created. The threads are loading and saving data in the background, and when they're done, they reset themselves ready to process the next request.

Problem

Now the trouble begins when deciding how to handle the event of changing the number of allowed threads (via a property of the component ActiveThreads: TActiveThreadRange which TActiveThreadRange = 1..20). Therefore, there can be anywhere between 1 and 20 threads created at a time. But when, let's say, the application using this component changes this property from 5 to 3. At this time, there are already 5 threads created, and I don't want to forcefully free that thread if it happens to be busy. I need to wait until it's done before I free it. And on the other hand, if the property is changed from 3 to 5, then I need to create 2 new threads. I need to know the proper approach to 'keep track' of these threads in this scenario.

Possibilities

Here are some possible ways I can think of to 'track' these threads...

  • Keep a TList containing each created thread - easy to manage
  • Create a TList wrapper or descendant containing each created thread - easier to manage, but more work
  • Keep an array containing each created thread - Would this be better than a TList?
  • Create an array wrapper containing each created thread

But then back to my original issue - What to do with existing busy threads when the ActiveThreads property is decreased? Creating them is no problem, but releasing them is becoming confusing. I usually make threads which free themselves, but this is the first time I've made one which is re-used. I just need to know the proper method of destroying these threads without interrupting their tasks.

Update

Based on the feedback, I have acquired and begun implementing the OmniThreadLibrary (as well as the long needed FastMM). I've also changed my approach a little - A way that I can create these threaded processes without managing them and without another thread to process the queue...

  • 1 master method to spawn a new process
    • function NewProcess(const Request: TProcessRequest): TProcessInfo;
    • TProcessRequest is a record with specifications of what's to be done (Filename, Options, etc.)
    • TProcessInfo is a record which passes back some status information.
  • Feed in an event handler for the event of being 'done' with its task when creating a new process. When component receives this message, it will check the queue.
    • If command is queued, it will compare the active process limit with current process count
    • > If exceeds limit, just stop and next completed process will do perform same check
    • > If within limit, kick off another new process (after ensuring previous process is done)
    • If no commands are queued, then just stop
  • Each process can die on its own after it has done its task (no keep-alive threading)
  • I won't have to worry about another timer or thread to continually loop through
    • Instead each process destroys its self and checks for new requests before doing so

Another Update

I have actually reverted back to using a TThread, as the OTL is very uncomfortable to use. I like to keep things wrapped and organized in its own class.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

人生百味 2025-01-05 08:18:22

正如 @NGLN 等所解释的,您需要池化一些线程,并接受管理线程数的最简单方法是将实际线程数与所需数量分开。将线程添加到池中很容易 - 只需创建更多实例(将生产者-消费者任务输入队列作为参数传递,以便线程知道要等待什么)。如果所需的线程数少于当前现有的线程数,您可以排队足够的“毒丸”来杀死多余的线程。

不要保留任何线程指针列表 - 这是不必要的微观管理麻烦的负载(并且可能会出错)。您需要保留的只是池中所需线程数量的计数,以便您知道当“poolDepth”属性发生更改时要采取什么操作。

事件触发器最好加载到发布到池的作业中 - 它们全部来自某个“TpooledTask”类,该类将事件作为构造函数参数并将其存储在某个“FonComplete”TNotifyEvent 中。运行任务的线程可以在完成工作后调用 FonComplete(使用 TpooledTask 作为发送者参数) - 您不需要知道哪个线程运行该任务。

例子:

    unit ThreadPool;

    interface

    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, contnrs, syncobjs;


    type

    TpooledTask=class(TObject)
    private
      FonComplete:TNotifyEvent;
    protected
      Fparam:TObject;
      procedure execute; virtual; abstract;
    public
      constructor create(onComplete:TNotifyEvent;param:TObject);
    end;

    TThreadPool=class(TObjectQueue)
    private
      access:TcriticalSection;
      taskCounter:THandle;
      threadCount:integer;
    public
      constructor create(initThreads:integer);
      procedure addTask(aTask:TpooledTask);
    end;

    TpoolThread=class(Tthread)
    private
      FmyPool:TThreadPool;
    protected
      procedure Execute; override;
    public
      constructor create(pool:TThreadPool);
    end;

    implementation

    { TpooledTask }

    constructor TpooledTask.create(onComplete: TNotifyEvent; param: TObject);
    begin
      FonComplete:=onComplete;
      Fparam:=param;
    end;

    { TThreadPool }

    procedure TThreadPool.addTask(aTask: TpooledTask);
    begin
      access.acquire;
      try
        push(aTask);
      finally
        access.release;
      end;
      releaseSemaphore(taskCounter,1,nil); // release one unit to semaphore
    end;

    constructor TThreadPool.create(initThreads: integer);
    begin
      inherited create;
      access:=TcriticalSection.create;
      taskCounter:=createSemaphore(nil,0,maxInt,'');
      while(threadCount<initThreads) do
      begin
        TpoolThread.create(self);
        inc(threadCount);
      end;
    end;

    { TpoolThread }

    constructor TpoolThread.create(pool: TThreadPool);
    begin
      inherited create(true);
      FmyPool:=pool;
      FreeOnTerminate:=true;
      resume;
    end;

procedure TpoolThread.execute;
var thisTask:TpooledTask;
begin
  while (WAIT_OBJECT_0=waitForSingleObject(FmyPool.taskCounter,INFINITE)) do
  begin
    FmyPool.access.acquire;
    try
      thisTask:=TpooledTask(FmyPool.pop);
    finally
      FmyPool.access.release;
    end;
    thisTask.execute;
    if assigned(thisTask.FonComplete) then thisTask.FonComplete(thisTask);
  end;
end;

end.

As explained by @NGLN etc, you need to pool some threads and accept that the easiest way to manage thread numbers is to divorce the actual number of threads from the desired number. Adding threads to the pool is easy - just create some more instances, (passing the producer-consumer task input queue as a parameter so that the thread knows what to wait on). If the desired number of threads is less than that currently existing, you could queue up enough 'poison-pills' to kill off the extra threads.

Don't keep any list of thread pointers - it's a load of micro-management hassle that's just not necessary, (and will probably go wrong). All you need to keep is a count of the number of desired threads in the pool so you know what action to take when something changes the 'poolDepth' property.

The event triggers are best loaded into the jobs that are issued to the pool - descend them all from some 'TpooledTask' class that takes an event as a constructor parameter and stores it in some 'FonComplete' TNotifyEvent. The thread that runs the task can call the FonComplete when it's done the job, (with the TpooledTask as the sender parameter) - you don't need to know what thread ran the task.

Example:

    unit ThreadPool;

    interface

    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, contnrs, syncobjs;


    type

    TpooledTask=class(TObject)
    private
      FonComplete:TNotifyEvent;
    protected
      Fparam:TObject;
      procedure execute; virtual; abstract;
    public
      constructor create(onComplete:TNotifyEvent;param:TObject);
    end;

    TThreadPool=class(TObjectQueue)
    private
      access:TcriticalSection;
      taskCounter:THandle;
      threadCount:integer;
    public
      constructor create(initThreads:integer);
      procedure addTask(aTask:TpooledTask);
    end;

    TpoolThread=class(Tthread)
    private
      FmyPool:TThreadPool;
    protected
      procedure Execute; override;
    public
      constructor create(pool:TThreadPool);
    end;

    implementation

    { TpooledTask }

    constructor TpooledTask.create(onComplete: TNotifyEvent; param: TObject);
    begin
      FonComplete:=onComplete;
      Fparam:=param;
    end;

    { TThreadPool }

    procedure TThreadPool.addTask(aTask: TpooledTask);
    begin
      access.acquire;
      try
        push(aTask);
      finally
        access.release;
      end;
      releaseSemaphore(taskCounter,1,nil); // release one unit to semaphore
    end;

    constructor TThreadPool.create(initThreads: integer);
    begin
      inherited create;
      access:=TcriticalSection.create;
      taskCounter:=createSemaphore(nil,0,maxInt,'');
      while(threadCount<initThreads) do
      begin
        TpoolThread.create(self);
        inc(threadCount);
      end;
    end;

    { TpoolThread }

    constructor TpoolThread.create(pool: TThreadPool);
    begin
      inherited create(true);
      FmyPool:=pool;
      FreeOnTerminate:=true;
      resume;
    end;

procedure TpoolThread.execute;
var thisTask:TpooledTask;
begin
  while (WAIT_OBJECT_0=waitForSingleObject(FmyPool.taskCounter,INFINITE)) do
  begin
    FmyPool.access.acquire;
    try
      thisTask:=TpooledTask(FmyPool.pop);
    finally
      FmyPool.access.release;
    end;
    thisTask.execute;
    if assigned(thisTask.FonComplete) then thisTask.FonComplete(thisTask);
  end;
end;

end.
妖妓 2025-01-05 08:18:22

您可以在请求队列中实现 FreeNotify 消息,当工作线程收到此消息时释放自己。在您的示例中,当您将线程数从 5 减少到 3 时,只需将 2 个 FreeNotify 消息放入队列中,然后 2 个工作线程将空闲。

You can implement FreeNotify Message in your request queue and when Worker Thread receieve this message free themselves. In your example when you decrease number of threads from 5 to 3 just put 2 FreeNotify messages in your queue and 2 worker threads will be free.

长不大的小祸害 2025-01-05 08:18:22

关于活动线程减少的问题:抱歉,但您只需自己决定。要么立即释放不需要的线程(尽早终止它们),要么让它们运行直到完成(所有工作完成后终止它们)。这是你的选择。当然,您必须将所需数量的变量与实际线程数的变量分开。更新线程变量的实际数量(可能只是 List.Count)的问题对于两者来说完全相同,因为任一解决方案都需要一些时间。

关于多线程的管理:您可以研究这个答案,它将线程存储在 TList 中。不过,它需要根据您的具体愿望清单进行一些调整,如果需要帮助,请大声喊叫。此外,当然还有更多可能的实现可以从默认 TThread 的使用中派生出来。请注意,存在其他(多)线程库,但我从未需要使用它们。

About your problem with the decrement of active threads: sorry, but you simply have to decide for yourself. Either free the unwanted threads immediately (which terminates them at the earliest possible moment), or let them run until they are finished (which terminates them after all work is done). It is your choice. Of course you have to separate the variable for the wished number from that of the actual number of threads. The problem for updating the actual number of threads variable (could simply be a List.Count) are for both exactly the same since either solution will require some time.

And on management of multiple threads: you can study this answer which stores the threads in a TList. It needs a little tweaking for you specific wish list though, please shout in case of need of assistance with that. Also, there are of course more possible implementations which can be derived from the use of the default TThread. And note that there exist other (multi)thread libraries, but I have never had the need to use them.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文