使用一组有限的线程定期触发类似的任务

发布于 2024-11-29 15:25:29 字数 1925 浏览 2 评论 0原文

我之前这里,但经过深思熟虑以及那些回答我的人的实现,我发现我的方法可能不正确。

当我实现上一个问题的解决方案时,出现了以下测试结果:

  1. 当我“模拟”在线程池中的多个线程上同时运行的多个任务时(通过使线程在随机时间从例如 1 到 20 秒),那么模型似乎工作得很好。我将系统设置为每 1 秒轮询一次,看看它是否可以生成另一个线程,一切看起来都很好。运行时间较长(睡眠)的线程将稍后完成,并且线程会到处启动和终止。如果我碰巧用完线程(我将其设置为生成不超过 10 个),它会坐下来等待一个可用的线程。
  2. 然而,当我让系统在每个线程中进行实际处理时(这将花费 3 秒以上的时间),这将涉及读取数据、生成保存数据的 XML、发送电子邮件等,系统将生成 1、2 或 3 个线程,进行处理,然后关闭线程(3...2...1...),然后说 0 个线程正在运行(我在各处添加了 console.writelines 来记录该过程)。然后它会挂在 0 个线程周围,不再选择任何工作!

所以我决定再次陈述我的问题,希望有人能找到解决方案。到目前为止我已经尝试了各种解决方案:

  1. ThreadPool:总是提到您不应该过度使用ThreadPool并且作业必须“快速”,但是“快速”的定义是什么?我如何知道线程池有多大/有多忙?
  2. 线程:总是说线程很昂贵,你必须处理它们的启动和结束,但是我如何限制它们,我尝试过信号量,“锁定”对象,公共变量,但没有任何效果

所以这里是我想要完成的:

  1. 我有同样的工作需要定期运行,即 gmail 会每 5 秒检查一次它的服务器是否有新电子邮件。
  2. 如果有工作要做(即您有新电子邮件要发送到收件箱),则生成一个异步线程并使其开始工作。这项工作通常需要比 (1) 中规定的间隔更长的时间,因此异步线程,如果间隔过去并且系统再次检查是否有新工作并查看您有更多工作,它将生成另一个线程并使其完成开始工作。
  3. 就像我的例子一样,所有的工作都是同一种工作(检查新邮件),并且完全相互独立,互不影响。如果其中一个失败,其余的可以继续工作,没有任何问题。
  4. 我需要限制可以拥有的并发线程数和最大线程数。如果我选择“10”,那么系统应该像 (1) 中那样开始检查作业,并像 (1) 中那样继续生成线程,直到达到 10 个线程。在某个时间间隔内生成新线程的所有新尝试都应该失败(不执行任何操作),直到再次释放线程。这里我想选择是:(a)当它被释放时,已经有一些工作排​​队等待分配给新的打开线程或(b)在下一个时间间隔检查是否有新工作并将其分配给新的打开线程线。
  5. 如果没有工作,那么通常系统应该坐下来等待,没有线程,本质上唯一应该运行的是某种计时器

我目前使用上一个问题中的示例来执行以下操作:

  1. 我启动一个计时器,每 1 秒滴答一次
  2. ,每次滴答时我 'ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork)'
  3. 在 DoWork II 中实例化一个类并调用执行某些工作的各种方法

...但这会导致我之前提到的只有 3 个线程消失,然后什么也没有。

我正在考虑执行以下操作:

  1. 将 ThreadPool 设置为 10 个线程,
  2. 启动一个计时器,并在每个时钟周期中启动 ThreadPool.QueueUserWorkItem',然后继续执行此操作,希望 ThreadPool 能够处理其他所有事情。这不是 ThreadPool 应该做的吗?

(对于所涉及的解释感到抱歉!)

I have asked a similar question before here, but after much thought, and implementations from those that answered me, I found that my approach might have been incorrect.

When I implement the solution given to me on this previous question the following test result appeared:

  1. When I 'simulate' multiple tasks running concurrently on multiple threads from the threadpool (by making the threads sleep at random times from 1 to 20 seconds for instance), then the model seems to work fine. I set the system to poll every 1 second to see if it can spawn another thread and all seems fine. Longer running (sleeping) threads would complete later on and threads would start and die all over the place. If I happen to run out of threads (I set it to spawn no more than 10) it would sit and wait for one to become available.
  2. When I however make the system do actual processing in each thread (which would take anything from 3 seconds upwards), which would involve reading data, generating XMLs saving data, sending emails and the like, the system would spawn 1, 2 or 3 threads, do processing and then just close the threads (3...2...1...) and then say 0 threads running (I added console.writelines everywhere to document the process). It would then hang around 0 threads, never picking any more work!

So I decided to state my issue again the hopes that someone has a solution. I have tried various solutions so far:

  1. ThreadPool: There's always the mention that you shouldn't over-work the ThreadPool and jobs has to be 'quick', but what is the definition of 'quick'? How do I know how big/busy the ThreadPool is?
  2. Threads: It's always stated that Threads are expensive and you have to handle them starting up and ending, but how do I limit them, I have tried Semaphores, 'lock' objects, public variables, but it no no avail

So here is what I would like to accomplish:

  1. I have the same job that needs to run at regular intervals, i.e. like gmail would check it's server for new email for you every 5 seconds.
  2. If there is work to be done (i.e. you have new emails to be sent to your inbox), then spawn an async thread and make it start the work. This work will typically take longer than the interval stated in (1), hence the async thread, if an interval passes and the system checks again to see if there's new work and see you have more work, it will spawn another thread and make it start the work.
  3. As in my example, all the jobs are the same kind of job (check of new mail), and are totally independent of eachother, they do not influence each other. If one of them fails, the rest can continue on working with no issue.
  4. I need there to be a limit of how many concurrent threads and maximum threads I can have. If I pick '10', then the system should start checking for jobs as in (1), and keep on spawning threads as in (1), until it reaches 10 threads. All new attempts on an interval to spawn a new thread should just fail (do nothing) until a thread is released again. Here I suppose the choice will be: (a) when it's released there will already be some work queued waiting to be given to the new open thread or (b) on the next interval check if there's new work and assign it to the new open thread.
  5. If there is no work, then typically the system should sit and wait, having no threads and in essence the only thing that should be running is some sort of timer

I currently use the sample in the previous question to do the following:

  1. I start a timer, that ticks every 1 sec
  2. On every tick I 'ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork)'
  3. In DoWork I I instantiate a class and call various methods that does some work

...but this leads to what I mentioned before, only 3 threads that die off and then nothing.

I as thinking of doing the following:

  1. Set the ThreadPool to 10 thread's
  2. Start a timer and in each tick ThreadPool.QueueUserWorkItem', and just keep on doing this, hoping that the ThreadPool will handle everything else. Isn't this what the ThreadPool is supposed to do?

Any help will be fantastic! (Sorry for the involved explanation!)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

≈。彩虹 2024-12-06 15:25:29

尝试查看 Semaphore 类。您可以使用它来设置可以同时访问特定资源的线程数量的限制(当我说资源时,它可以是任何东西)。

好的,编辑详细信息:

在管理线程的类中,您创建:

Semaphore concurrentThreadsEnforcer = new Semaphore(value1, value2);

然后,您启动的每个线程将调用:

concurrentThreadsEnforcer.WaitOne();

这将从信号量中获取一个槽并将其提供给新线程,或者阻塞新线程直到一个槽变得可用。

每当你的新线程完成其工作时,他(我喜欢个性化)必须调用,原因很明显:

concurrentThreadsEnforcer.Release().

现在,关于构造函数,第二个参数相当简单:说明在任何给定时间有多少并发线程可以访问资源。

第一个有点棘手。第二个参数和第一个参数之间的差异将说明为调用线程保留多少个信号量槽。也就是说,所有新生成的线程都可以访问第一个参数指定的槽数,而剩下的直到第二个参数的值将保留给创建该线程的原始线程。信号量(调用线程)。

在您的情况下,对于 10 个最大线程,您可以使用:

... = new Semaphore(10, 10);

因为我无论如何都发布了一个故事,所以让我提供更多详细信息。

我将在新线程中执行此操作的方式如下:

bool aquired = false;
try
{
    aquired = concurrentThreadsEnforcer.WaitOne();

    // Do some work here
} // Optional catch statements
finally
{
    if (aquired)
        concurrentThreadsEnforcer.Release();;
}

Try to have a look at the Semaphore class. You can use that to set a limit to how many threads can concurrently access a particular resource (and when I say resource, it can be anything).

Ok, edited for details:

In your class managing the threads, you create:

Semaphore concurrentThreadsEnforcer = new Semaphore(value1, value2);

Then, each thread you start will call:

concurrentThreadsEnforcer.WaitOne();

That will either take one slot from the semaphore and give it to the new thread, or block the new thread until a slot becomes available.

Whenever your new thread finishes its work, he (I like personalizing) MUST call, for obvious reasons:

concurrentThreadsEnforcer.Release().

Now, regarding the constructor, the second parameter is fairly simple: states how many concurrent threads can access the resource at any given time.

The first one is a bit trickier. The difference between the second parameter and the first one will state how many semaphore slots are reserved for the calling thread. That is, all your newly spawned threads will have access to the number of slots stated by the first parameter, and the rest of them up to the second parameter's value will be reserved for the original thread that created the semaphore (calling thread).

In your case, for 10 max threads, you would use:

... = new Semaphore(10, 10);

Since I posted a story anyway, let me gibe more details.

The way I will do it in the new threads, will be like this:

bool aquired = false;
try
{
    aquired = concurrentThreadsEnforcer.WaitOne();

    // Do some work here
} // Optional catch statements
finally
{
    if (aquired)
        concurrentThreadsEnforcer.Release();;
}
流星番茄 2024-12-06 15:25:29

我会使用 BlockingCollectionParallel.ForEach 的组合,

如下所示:

private BlockingCollection<Job> jobs = new BlockingCollection<Job>();
private Task jobprocessor;

public void StartWork() {
    timer.Start();
    jobprocessor = Task.Factory.StartNew(RunJobs);
}

public void EndWork() {
    timer.Stop();
    jobs.CompleteAdding();
    jobprocessor.Wait();
}

public void TimerTick() {
   var job = new Job();
   if (job.NeedsMoreWork())
       jobs.Add(job);
}

public void RunJobs() {
    var options = new ParallelOptions { MaxDegreeOfParallelism = 10 };
    Parallel.ForEach(jobs.GetConsumingPartitioner(), options,
                     job => job.DoSomething());
}

I would use a combination of BlockingCollection and Parallel.ForEach

Something like this:

private BlockingCollection<Job> jobs = new BlockingCollection<Job>();
private Task jobprocessor;

public void StartWork() {
    timer.Start();
    jobprocessor = Task.Factory.StartNew(RunJobs);
}

public void EndWork() {
    timer.Stop();
    jobs.CompleteAdding();
    jobprocessor.Wait();
}

public void TimerTick() {
   var job = new Job();
   if (job.NeedsMoreWork())
       jobs.Add(job);
}

public void RunJobs() {
    var options = new ParallelOptions { MaxDegreeOfParallelism = 10 };
    Parallel.ForEach(jobs.GetConsumingPartitioner(), options,
                     job => job.DoSomething());
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文