使用多线程优先的作业调度程序

发布于 2024-10-16 09:55:12 字数 513 浏览 5 评论 0原文

我即将启动一个创建作业调度程序的 C# .NET 4.0 项目。

  1. 该作业没有截止日期,并且可能会长时间运行,长达数天。
  2. 该作业有 3 个优先级:空闲、正常、关键;从最低到最高。
  3. 新的就业岗位不断被创造。
  4. 即使旧作业已创建很长时间,具有较高优先级的新作业也应优先于较低优先级的作业。
  5. 每个作业将由单个长时间运行的线程处理。
  6. 工作是可重入的。作业的状态会持久保存到数据库中,因此可以随时暂停作业或终止作业线程。

我的计划是使用信号量,并将并发条目数设置为系统核心数。队列中的每个作业都会创建一个新线程,并且所有线程一开始都会被信号量阻塞。

我的问题是保证当信号量调用release()方法时高优先级线程将首先进入信号量。可行吗?

我的第二个问题是,当更高优先级的作业线程存在时,让信号量内部的线程退出,并使退出的作业线程返回到线程队列以等待信号量。可行吗?

对于这两个问题,信号量是正确的方法吗?如果不是,你有什么建议?

I am about to start a C# .NET 4.0 project that creates a job scheduler.

  1. The job has no due date, and are potentially long running, up to days.
  2. The job has 3 priorities: idle, normal, critical; from lowest to highest.
  3. New job(s) are continuously being created.
  4. Newer job with higher priority should take precedence over lower priority job even if the old job has been created for a long time.
  5. Each job will be handled by single long running thread.
  6. Job are re-entrant. The state of job is persisted to the database, so it is okay to pause the job or terminate the job thread at anytime.

My plan is to use a semaphore, and set the number of concurrent entries to the number of system cores. A new thread will be created for every jobs in queue, and all thread will be blocked by the semaphore in the beginning.

My problem is to guarantee high priority thread will enter the semaphore first when semaphore calls release() method. Doable?

My second problem is to have a thread that is inside the semaphore to exit when higher priority job thread come to existence and have the exiting job thread go back to the thread queue to wait for the semaphore. Doable?

For those two problems, is semaphore the right approaches? If not what do you suggest?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

蓦然回首 2024-10-23 09:55:12

好吧,我更倾向于以下内容...

首先,启动您想要的所有线程:

for(int i=0; i < Environment.ProcessorCount; i++)
{
    Thread t = new Thread(RunWork);
    // setup thread
    t.Start();
    threads.Add(t);
}

您将需要一个接口来描述任务的优先级

interface ITask {
    PrioirtyType Prioirty { get; }
    bool Complete { get; }
    void PerformOneUnitOfWork();
}

然后创建一个队列管理对象。这显然会变得更加复杂,因为它可能需要与您的数据库同步等...

class MyQueue<TJob> where TJob : ITask 
{
    Queue<TJob> high, med, low;
    bool GetNextJob(ref TJob work)
    {
        if(work.Priority == PriorityType.High && !work.Complete)
            return true;
        lock(this)
        {
            if(high.Count > 0)
            {
                Enqueue(work);//requeue to pick back up later
                work = high.Dequeue();
                return true;
            }
            if(work.Priority == PriorityType.Med && !work.Complete)
                return true;
            if(med.Count > 0)
            {
                Enqueue(work);//requeue to pick back up later
                work = med.Dequeue();
                return true;
            }
            if(!work.Complete)
                return true;
            if(low.Count > 0)
            {
                work = low.Dequeue();
                return true;
            }
            work = null;
            return false;
        }

    void Enqueue(TJob work)
    {
        if(work.Complete) return;
        lock(this)
        {
            else if(work.Priority == PriorityType.High) high.Enqueue(work);
            else if(work.Priority == PriorityType.Med) med.Enqueue(work);
            else low.Enqueue(work);
        }
    }
}

最后创建您的工作线程,如下所示:

public void RunWork()
{
    ITask job;
    while(!_shutdown.WaitOne(0))
    {
        if(queue.GetNextJob(ref job))
            job.PerformOneUnitOfWork();
        else
            WaitHandle.WaitAny(new WaitHandle[] { _shutdown, queue.WorkReadyHandle });
    }
}

Well, I would lean more towards something like the following...

First, Start all the threads you want:

for(int i=0; i < Environment.ProcessorCount; i++)
{
    Thread t = new Thread(RunWork);
    // setup thread
    t.Start();
    threads.Add(t);
}

You will need an interface to describe the priority of a task

interface ITask {
    PrioirtyType Prioirty { get; }
    bool Complete { get; }
    void PerformOneUnitOfWork();
}

Then create a Queue management object. This would obviously get more complicated as it may need to sync with your database, etc...

class MyQueue<TJob> where TJob : ITask 
{
    Queue<TJob> high, med, low;
    bool GetNextJob(ref TJob work)
    {
        if(work.Priority == PriorityType.High && !work.Complete)
            return true;
        lock(this)
        {
            if(high.Count > 0)
            {
                Enqueue(work);//requeue to pick back up later
                work = high.Dequeue();
                return true;
            }
            if(work.Priority == PriorityType.Med && !work.Complete)
                return true;
            if(med.Count > 0)
            {
                Enqueue(work);//requeue to pick back up later
                work = med.Dequeue();
                return true;
            }
            if(!work.Complete)
                return true;
            if(low.Count > 0)
            {
                work = low.Dequeue();
                return true;
            }
            work = null;
            return false;
        }

    void Enqueue(TJob work)
    {
        if(work.Complete) return;
        lock(this)
        {
            else if(work.Priority == PriorityType.High) high.Enqueue(work);
            else if(work.Priority == PriorityType.Med) med.Enqueue(work);
            else low.Enqueue(work);
        }
    }
}

And Lastly create your worker thread something like the following:

public void RunWork()
{
    ITask job;
    while(!_shutdown.WaitOne(0))
    {
        if(queue.GetNextJob(ref job))
            job.PerformOneUnitOfWork();
        else
            WaitHandle.WaitAny(new WaitHandle[] { _shutdown, queue.WorkReadyHandle });
    }
}
傲鸠 2024-10-23 09:55:12

对于这两个问题,信号量是正确的方法吗?如果没有,你有什么建议?

这确实取决于。通常,每个线程最好有多个作业,因为许多(尤其是长时间运行的)工作项将花费时间等待 CPU 以外的其他事物。例如,如果您正在从事从 WCF 服务中提取数据的工作,或遇到其他相关问题,您可能会花费大量时间被阻塞和闲置。

在这种情况下,最好让您的作业根据需要进行安排。在这种情况下,使用 ThreadPool 可能会更好。

但是,如果作业都是高 CPU 资源,那么您的方法可能是可取的。优先级队列可用于跟踪调度优先级,并决定运行哪个作业。

话虽这么说,我可能不会为此使用信号量。虽然它可以工作,但单个计数器(通过 Interlocked.Increment/Decrement 管理)和 ManualResetEvent 也可以工作,并且重量要轻得多。

For those two problems, is semaphore the right approaches? If not what do you suggest?

It really depends. Often, it's better to have more than one job per thread, since many (especially long running) work items will spend their time waiting on things other than the CPU. For example, if you are doing work where you're pulling from WCF services, or other related issues, you may spend a lot of your time blocked and idle.

In this case, it might be better to just let your jobs schedule as needed. Using the ThreadPool may be nicer in that type of scenario.

If, however, the jobs are all high CPU, then your approach may be desirable. A priority queue could be used to track the scheduling priority, and decide which job to run.

That being said, I'd probably not use a semaphore for this. While it would work, a single counter (managed via Interlocked.Increment/Decrement) and a ManualResetEvent would work just as well, and be a lot lighter weight.

娇纵 2024-10-23 09:55:12

有一个很棒的框架从 Java 移植到了 C#,请查看

http://quartznet.sourceforge.net/

There is a great framework that was ported over from Java to C# check it out

http://quartznet.sourceforge.net/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文