定时器效率

发布于 2024-09-27 19:57:13 字数 2881 浏览 0 评论 0原文

我计划开发一个包含数万个对象的系统,每个对象最多有 42 个(但更可能是 4 或 5 个左右)单独的操作,它们可能会定期执行。我还计划编写代码来停用计时器,直到该对象开始使用。当空闲时,每个对象只需要 1 个计时器,但当活动时,其他计时器将同时启动。起初,对象的数量会很小,可能只有几百个,但我预计它会呈指数级增长,并在几个月内开始达到数万个。

因此,我非常担心我将为计时器和这些对象编写的代码的效率。我可以在三个级别上编写此应用程序,这些级别都可以成功执行所需的任务。另外,我计划在四核服务器上运行这个系统,所以我想尽可能使用多线程。

为此,我决定使用 System.Timers.Timer 类,它为每个经过事件触发一个新线程。

这些是我正在考虑的 3 个级别:

  1. 一个计时器操作整个应用程序,它迭代每个对象,检查是否需要触发任何其他操作,如果需要,则运行它们,然后继续执行 。

  2. 多层计时器,其中每个对象都有一个主计时器,用于检查对象可能需要执行的所有功能,运行任何准备好的功能,然后将下一个计时器间隔设置为下一个所需的操作时间。

  3. 递归层计时器,其中每个对象中的每个操作都有自己的计时器,该计时器将被触发,然后设置为在下次可用时运行。

选项 1 的问题在于,对于如此多的对象和操作,以这种方式运行的单个计时器可能会运行 20 秒以上(同时它执行了几百万行循环代码),其中可能应该每 1 秒滴答一次。如果对象不保持同步,系统可能无法正常工作。

选项 2 的问题在于,它比选项 3 更难编写,但也不是太难,这也意味着系统上可能有超过 10,000 个计时器运行(每个对象一个),每个计时器都创建和销毁线程。流逝就像它与任何人无关(我不确定这是否是一个问题)。在这种情况下,每个计时器必须每秒至少触发一次,并且可能运行几百行代码(在极端情况下最多可能运行一千行)。

选项 3 的问题是系统中可能引入的计时器数量巨大。我说的是平均 10,000 多个计时器,并且有可能同时运行近 100,000 个计时器。不过,每个经过事件可能只需要运行 50 行或更少的代码,这使得它们非常短。过去事件的延迟在一个极端情况下为百分之一秒,在另一个极端情况下为五分钟,平均值可能约为 1 秒。

我精通 Visual Basic .NET,并计划用它来编写它,但我也可以回到高中时代,尝试用 C++ 来编写它以提高效率,如果它会产生那么大的差异(请让我知道您是否有关于语言之间代码效率的任何来源)。还考虑在集群 Linux 服务器而不是我的四核 Windows 服务器上运行它,但我不确定是否可以让我的任何 .NET 应用程序在这样的 Linux 集群上运行(希望有任何信息)也关于这一点)。

本主题要回答的主要问题是:

我是否使用选项 1、2 或 3,为什么?

〜考虑评论后进行编辑〜

所以第四个选项涉及带有自旋锁的计时器轮。这是一个作业类:

Public Class Job
Private dFireTime As DateTime
Private objF As CrossAppDomainDelegate
Private objParams() As Object

Public Sub New(ByVal Func As CrossAppDomainDelegate, ByVal Params() As Object, ByVal FireTime As DateTime)
    objF = Func
    dFireTime = FireTime
    objParams = Params
End Sub

Public ReadOnly Property FireTime()
    Get
        Return dFireTime
    End Get
End Property

Public ReadOnly Property Func() As CrossAppDomainDelegate
    Get
        Return objF
    End Get
End Property

Public ReadOnly Property Params() As Object()
    Get
        Return objParams
    End Get
End Property
End Class

然后是主循环实现:

Private Tasks As LinkedList(Of Job)

Private Sub RunTasks()
    While True
        Dim CurrentTime as DateTime = Datetime.Now            

        If Not Tasks.Count = 0 AndAlso Tasks(0).FireTime > CurrentTime Then
            Dim T As Job = Tasks(0)
            Tasks.RemoveFirst()
            T.Func.Invoke()
        Else
            Dim MillisecondDif As Double

            MillisecondDif = Tasks(0).FireTime.Subtract(CurrentTime).Milliseconds
            If MillisecondDif > 30 Then
                Threading.Thread.Sleep(MillisecondDif)
            End If
        End If

    End While
End Sub

我做对了吗?

EpicClanWars.com

~编辑 2~

将“任务”一词改为“工作”,这样人们就可以停止抱怨它了;)

~编辑 3~

添加了用于跟踪时间和时间的变量确保自旋循环在需要时发生

I am planning to develop a system with tens of thousands of objects in it, which will each have up to 42(but more likely down around 4 or 5) separate actions they will potentially be performing at regular intervals. I also plan to write code that will deactivate the timers until the object comes into use. When idle, the objects will only need 1 timer each, but when active, the other timers will all start at once. At first the number of objects will be small, maybe a few hundred, but I expect it to grow exponentially, and within a few months, start to reach up in the tens of thousands.

So, I am very worried about efficiency of the code I will be writing for the timers and for these objects. There are three levels in which I could write this application on that would all successfully perform the tasks required. Also, I plan to run this system on a Quad Core server, so I would like to make use of multi-threading wherever possible.

To this end, I've decided to use the System.Timers.Timer class which fires a new thread for each elapse event.

These are the 3 levels I am considering:

  1. One single timer operates the entire application, it iterates through each object, checks to see if any other actions need to be fired, and if so, runs them, then moves on to the next.

  2. Multi-tier timer where each object has a master timer that checks all of the functions the object could need to perform, runs any that are ready, and then sets the next timer interval to the next required action time.

  3. Recursive-tier timer where each action in each object has it's own timer that will be triggered, and then set to run the next time it will be available.

The problem with option 1 is that with so many objects and actions, one singular timer elapse in this manner could run for maybe 20+ seconds (while it executed a few million lines of looped code), where this should probably be ticking every 1 second. If the objects aren't kept in synch, the system would likely not work well.

The problem with option 2 is that it would be a little harder to write than option 3, but not by much, it would also mean perhaps 10,000+ maybe timers running on the system (one for each object), creating and destroying threads with each elapse like its nobody's business (which I'm not sure if this is a problem or not). Each timer would have to fire at least once per second in this situation, with perhaps a few hundred lines of code running (up to perhaps a thousand in an extreme case).

The problem with option 3 is the sheer amount of timers that could potentially be introduced into the system. I'm talking about an average of 10,000+ timers with the potential for near 100,000+ timers to be run at the same time. Each elapse event may only have to run 50 or less lines of code though, making them very short. The elapse events would have delays between a hundredth of a second on one extreme, and five minutes on the other, with the average likely being around 1 second.

I am proficient in Visual Basic .NET, and was planning to write it in that, but I could also revert to my high-school days and try to write this in C++ for efficiency if it would make that much of a difference (please let me know if you have any sources on code efficiency between languages). Also toying with the notion of running this on a clustered Linux server instead of my Quad Core Windows server, but I'm not sure if I could get any of my .NET apps to run on a linux cluster like that (would love any info on that as well).

The main question to answer for this topic is:

Do I use option 1, 2, or 3, and why?

~Edit after considering comments~

So the 4th option involving the timer wheel with a spinlock. Here is a job class:

Public Class Job
Private dFireTime As DateTime
Private objF As CrossAppDomainDelegate
Private objParams() As Object

Public Sub New(ByVal Func As CrossAppDomainDelegate, ByVal Params() As Object, ByVal FireTime As DateTime)
    objF = Func
    dFireTime = FireTime
    objParams = Params
End Sub

Public ReadOnly Property FireTime()
    Get
        Return dFireTime
    End Get
End Property

Public ReadOnly Property Func() As CrossAppDomainDelegate
    Get
        Return objF
    End Get
End Property

Public ReadOnly Property Params() As Object()
    Get
        Return objParams
    End Get
End Property
End Class

And then the main loop implementation:

Private Tasks As LinkedList(Of Job)

Private Sub RunTasks()
    While True
        Dim CurrentTime as DateTime = Datetime.Now            

        If Not Tasks.Count = 0 AndAlso Tasks(0).FireTime > CurrentTime Then
            Dim T As Job = Tasks(0)
            Tasks.RemoveFirst()
            T.Func.Invoke()
        Else
            Dim MillisecondDif As Double

            MillisecondDif = Tasks(0).FireTime.Subtract(CurrentTime).Milliseconds
            If MillisecondDif > 30 Then
                Threading.Thread.Sleep(MillisecondDif)
            End If
        End If

    End While
End Sub

Do I have it right?

EpicClanWars.com

~Edit 2~

Switched the word "Task" out for "Job" so ppl could stop complaining about it ;)

~Edit 3~

Added variables for tracking time & ensuring spinloops happen when needed

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

木森分化 2024-10-04 19:57:13

编辑:我记得有趣的采访绝对值得一看:Arun Kishan:Windows 7 内部 - 告别 Windows 内核调度程序锁

正如 @Steven Sudit 所说,我再次警告:仅将其用作计时器轮如何工作以及您需要执行的一些任务的演示。实施时必须关心。不作为参考实现。 您必须编写更复杂的逻辑来考虑可用资源、调度逻辑等。


现实世界中,

在 作业列表(通常取决于情况):

  • SortedList<>; (或 SortedDictionary<>)对内存消耗和索引有好处,但必须实现同步访问

  • ConcurrentQueue>>将帮助您避免锁定,但您必须实施排序。它的内存效率也非常高

  • LinkedList<>适合插入和检索(无论如何我们只需要头),但需要同步访问(通过无锁轻松实现)并且内存效率不高,因为它存储两个引用(上一个/下一个)。但当您有数百万个作业时,所有这些作业都占用大量内存,这就成为一个问题。

但是:

我完全同意@Steven:

没关系:这两者都不合适。正确的答案是使用常规队列并自己维护其顺序,因为我们通常只需要从头部或尾部访问它。

通常,我建议使用库中功能最齐全的集合,但这并不适用于此,因为这是系统级代码。我们需要推出自己的产品,无论是从头开始还是在功能不太丰富的集合之上

2)为了简化并发作业的处理逻辑,您可以将委托列表(例如通过 ConcurrentQueue 使其无锁)添加到原始作业中因此,当您同时需要另一份工作时,您只需添加另一个代表即可开始。

@史蒂文:

如果两个任务实际上被安排在同一时间(无论是实际上还是有效),这是正常情况,不需要使我们的数据结构复杂化。换句话说,我们不需要将同时发生的作业分组,这样我们就必须遍历两个不同的集合;我们可以让它们相邻

3) 启动/停止调度程序并不那么直接,因此可能会导致错误。相反,您可以在使用超时时等待事件。

@史蒂文:

这样,它就会在下一个作业准备就绪或在头部之前插入新作业时唤醒。在后一种情况下,它可能需要立即运行它或设置不同的等待。如果同时呈现 100 个作业,我们能做的最好的事情就是将它们全部排队。

如果我们需要提供优先级,那么这是优先级调度队列和生产者/消费者关系中的多个池的工作,但它仍然不能证明启动/停止调度程序是合理的。调度程序应始终处于开启状态,在有时会放弃核心的单个循环中运行

4)关于使用刻度:

@Steven:

坚持一种类型的刻度很好,但是混合和匹配会变得很难看,特别是因为它依赖于硬件。我确信刻度会比毫秒稍快,因为它存储前者并且必须除以一个常数才能得到后者。此操作是否最终成本高昂是另一回事,但我可以使用蜱来避免风险。

我的想法:

另一个好点,我同意你的观点。但有时除以常数会变得昂贵,而且速度并不像看起来那么快。但是当我们谈论 100 000 个 DateTimes 时,这并不重要,你是对的,谢谢你指出。

5)“管理资源”:

@Steven:

我想强调的问题是对 GetAvailableThreads 的调用既昂贵又幼稚;在你可以使用它之前,答案就已经过时了。如果我们真的想跟踪,我们可以通过从使用 Interlocked.Increment/Decrement 的包装器调用作业来获取初始值并保持运行计数。即使这样,它仍然假定程序的其余部分没有使用线程池。如果我们真的想要精细控制,那么正确的答案是滚动我们自己的线程池

我绝对同意调用 GetAvailableThreads 是通过 CorGetAvailableThreads 监视可用资源的简单方法,并不那么昂贵。我想证明需要管理资源,但似乎选择了不好的例子。

源代码示例中提供的任何方式都不得被视为监视可用资源的正确方法。我只是想证明你必须考虑一下。作为示例,可能没有编写出那么好的代码。

6)使用Interlocked.CompareExchange:

@Steven:

不,这不是常见模式。最常见的模式是短暂锁定。不太常见的是将变量标记为易失性。不太常见的是使用 VolatileRead 或 MemoryBarrier。即使 Richter 这样做了,以这种方式使用 Interlocked.CompareExchange 也是晦涩难懂的。在没有解释性注释的情况下使用它绝对会造成混淆,因为“比较”一词意味着我们正在进行比较,而实际上我们没有进行比较。

你是对的,我必须指出它的用法。


using System;
using System.Threading;

// Job.cs

// WARNING! Your jobs (tasks) have to be ASYNCHRONOUS or at least really short-living
// else it will ruin whole design and ThreadPool usage due to potentially run out of available worker threads in heavy concurrency

// BTW, amount of worker threads != amount of jobs scheduled via ThreadPool
// job may waits for any IO (via async call to Begin/End) at some point 
// and so free its worker thread to another waiting runner

// If you can't achieve this requirements then just use usual Thread class
// but you will lose all ThreadPool's advantages and will get noticeable overhead

// Read http://msdn.microsoft.com/en-us/magazine/cc164139.aspx for some details

// I named class "Job" instead of "Task" to avoid confusion with .NET 4 Task 
public class Job
{
    public DateTime FireTime { get; private set; }

    public WaitCallback DoAction { get; private set; }
    public object Param { get; private set; }

    // Please use UTC datetimes to avoid different timezones problem
    // Also consider to _never_ use DateTime.Now in repeat tasks because it significantly slower 
    // than DateTime.UtcNow (due to using TimeZone and converting time according to it)

    // Here we always work with with UTC
    // It will save you a lot of time when your project will get jobs (tasks) posted from different timezones
    public static Job At(DateTime fireTime, WaitCallback doAction, object param = null)
    {
        return new Job {FireTime = fireTime.ToUniversalTime(), DoAction = doAction, Param = param};
    }

    public override string ToString()
    {
        return string.Format("{0}({1}) at {2}", DoAction != null ? DoAction.Method.Name : string.Empty, Param,
                             FireTime.ToLocalTime().ToString("o"));
    }
}

 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 using System.Threading;

// Dispatcher.cs

// Take a look at System.Runtime IOThreadTimer.cs and IOThreadScheduler.cs
// in Microsoft Reference Source, its interesting reading

public class Dispatcher
{
    // You need sorted tasks by fire time. I use Ticks as a key to gain some speed improvements during checks
    // There are maybe more than one task in same time
    private readonly SortedList<long, List<Job>> _jobs;

    // Synchronization object to access _jobs (and _timer) and make it thread-safe
    // See comment in ScheduleJob about locking
    private readonly object _syncRoot;

    // Queue (RunJobs method) is running flag
    private int _queueRun;

    // Flag to prevent pollute ThreadPool with many times scheduled JobsRun
    private int _jobsRunQueuedInThreadPool;

    // I'll use Stopwatch to measure elapsed interval. It is wrapper around QueryPerformanceCounter
    // It does not consume any additional resources from OS to count

    // Used to check how many OS ticks (not DateTime.Ticks!) elapsed already
    private readonly Stopwatch _curTime;

    // Scheduler start time. It used to build time delta for job
    private readonly long _startTime;

    // System.Threading.Timer to schedule next active time
    // You have to implement syncronized access as it not thread-safe
    // http://msdn.microsoft.com/en-us/magazine/cc164015.aspx
    private readonly Timer _timer;

    // Minimum timer increment to schedule next call via timer instead ThreadPool
    // Read http://www.microsoft.com/whdc/system/pnppwr/powermgmt/Timer-Resolution.mspx
    // By default it around 15 ms
    // If you want to know it exactly use GetSystemTimeAdjustment via Interop ( http://msdn.microsoft.com/en-us/library/ms724394(VS.85).aspx )
    // You want TimeIncrement parameter from there
    private const long MinIncrement = 15 * TimeSpan.TicksPerMillisecond;

    // Maximum scheduled jobs allowed per queue run (specify your own suitable value!)
    // Scheduler will add to ThreadPool queue (and hence count them as processed) no more than this constant

    // This is balance between how quick job will be scheduled after it time elapsed in one side, and 
    // how long JobsList will be blocked and RunJobs owns its thread from ThreadPool
    private const int MaxJobsToSchedulePerCheck = 10;

    // Queue length
    public int Length
    {
        get
        {
            lock (_syncRoot)
            {
                return _jobs.Count;
            }
        }
    }

    public Dispatcher()
    {
        _syncRoot = new object();

        _timer = new Timer(RunJobs);

        _startTime = DateTime.UtcNow.Ticks;
        _curTime = Stopwatch.StartNew();

        _jobs = new SortedList<long, List<Job>>();
    }


    // Is dispatcher still working
    // Warning! Queue ends its work when no more jobs to schedule but started jobs can be still working
    public bool IsWorking()
    {
        return Interlocked.CompareExchange(ref _queueRun, 0, 0) == 1;
    }

    // Just handy method to get current jobs list
    public IEnumerable<Job> GetJobs()
    {
        lock (_syncRoot)
        {
            // We copy original values and return as read-only collection (thread-safety reasons)
            return _jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
        }
    }

    // Add job to scheduler queue (schedule it)
    public void ScheduleJob(Job job)
    {
        // WARNING! This will introduce bottleneck if you have heavy concurrency. 
        // You have to implement lock-free solution to avoid botleneck but this is another complex topic.
        // Also you can avoid lock by using Jeffrey Richter's ReaderWriterGateLock (http://msdn.microsoft.com/en-us/magazine/cc163532.aspx)
        // But it can introduce significant delay under heavy load (due to nature of ThreadPool)
        // I recommend to implement or reuse suitable lock-free algorithm. 
        // It will be best solution in heavy concurrency (if you have to schedule large enough job count per second)
        // otherwise lock or maybe ReaderWriterLockSlim is cheap enough
        lock (_syncRoot)
        {
            // We'll shift start time to quick check when it pasts our _curTime
            var shiftedTime = job.FireTime.Ticks - _startTime;

            List<Job> jobs;
            if (!_jobs.TryGetValue(shiftedTime, out jobs))
            {
                jobs = new List<Job> {job};
                _jobs.Add(shiftedTime, jobs);
            }
            else jobs.Add(job);


            if (Interlocked.CompareExchange(ref _queueRun, 1, 0) == 0)
            {
                // Queue not run, schedule start
                Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0);
                ThreadPool.QueueUserWorkItem(RunJobs);
            }
            else 
            {
                // else queue already up and running but maybe we need to ajust start time
                // See detailed comment in RunJobs

                long firetime = _jobs.Keys[0];
                long delta = firetime - _curTime.Elapsed.Ticks;

                if (delta < MinIncrement)
                {
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else 
                {
                    Console.WriteLine("DEBUG: Wake up time changed. Next event in {0}", TimeSpan.FromTicks(delta));
                    _timer.Change(delta/TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }

        }
    }


    // Job runner
    private void RunJobs(object state)
    {
        // Warning! Here I block list until entire process done, 
        // maybe better will use ReadWriterLockSlim or somewhat (e.g. lock-free)
        // as usually "it depends..."

        // Here processing is really fast (a few operation only) so until you have to schedule many jobs per seconds it does not matter
        lock (_syncRoot)
        {
            // We ready to rerun RunJobs if needed
            Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 0, 1);

            int availWorkerThreads;
            int availCompletionPortThreads;

            // Current thread stats
            ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);

            // You can check max thread limits by
            // ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);

            int jobsAdded = 0;

            while (jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1 && _jobs.Count > 0)
            {
                // SortedList<> implemented as two arrays for keys and values so indexing on key/value will be fast
                // First element
                List<Job> curJobs = _jobs.Values[0];
                long firetime = _jobs.Keys[0];

                // WARNING! Stopwatch ticks are different from DateTime.Ticks
                // so we use _curTime.Elapsed.Ticks instead of _curTime.ElapsedTicks

                // Each tick in the DateTime.Ticks value represents one 100-nanosecond interval. 
                // Each tick in the ElapsedTicks value represents the time interval equal to 1 second divided by the Frequency.
                if (_curTime.Elapsed.Ticks <= firetime) break;

                while (curJobs.Count > 0 &&  jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1)
                {
                    var job = curJobs[0];

                    // Time elapsed and we ready to start job
                    if (job.DoAction != null)
                    {
                        // Schedule new run

                        // I strongly recommend to look at new .NET 4 Task class because it give superior solution for managing Tasks
                        // e.g. cancel run, exception handling, continuation, etc
                        ThreadPool.QueueUserWorkItem(job.DoAction, job);
                        ++jobsAdded;

                        // It may seems that we can just decrease availWorkerThreads by 1 
                        // but don't forget about started jobs they can also consume ThreadPool's threads
                        ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
                    }

                    // Remove job from list of simultaneous jobs
                    curJobs.Remove(job);
                }

                // Remove whole list if its empty
                if (curJobs.Count < 1) _jobs.RemoveAt(0);
            }

            if (_jobs.Count > 0)
            {
                long firetime = _jobs.Keys[0];

                // Time to next event
                long delta = firetime - _curTime.Elapsed.Ticks; 

                if (delta < MinIncrement) 
                {
                    // Schedule next queue check via ThreadPool (immediately)
                    // It may seems we start to consume all resouces when we run out of available threads (due to "infinite" reschdule)
                    // because we pass thru our while loop and just reschedule RunJobs
                    // but this is not right because before RunJobs will be started again
                    // all other thread will advance a bit and maybe even complete its task
                    // so it safe just reschedule RunJobs and hence wait when we get some resources
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else // Schedule next check via timer callback
                {
                    Console.WriteLine("DEBUG: Next event in {0}", TimeSpan.FromTicks(delta)); // just some debug output
                    _timer.Change(delta / TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }
            else // Shutdown the queue, no more jobs
            {
                Console.WriteLine("DEBUG: Queue ends");
                Interlocked.CompareExchange(ref _queueRun, 0, 1); 
            }
        }
    }
}

快速使用示例:

    // Test job worker
    static void SomeJob(object param)
    {
        var job = param as Job;
        if (job == null) return;

        Console.WriteLine("Job started: {0}, [scheduled to: {1}, param: {2}]", DateTime.Now.ToString("o"),
                          job.FireTime.ToLocalTime().ToString("o"), job.Param);
    }

    static void Main(string[] args)
    {
        var curTime = DateTime.UtcNow;
        Console.WriteLine("Current time: {0}", curTime.ToLocalTime().ToString("o"));
        Console.WriteLine();

        var dispatcher = new Dispatcher();

        // Schedule +10 seconds to future
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:1"));
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:2"));

        // Starts almost immediately
        dispatcher.ScheduleJob(Job.At(curTime - TimeSpan.FromMinutes(1), SomeJob, "past"));

        // And last job to test
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(25), SomeJob, "+25 sec"));

        Console.WriteLine("Queue length: {0}, {1}", dispatcher.Length, dispatcher.IsWorking()? "working": "done");
        Console.WriteLine();

        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
        Console.WriteLine();

        Console.ReadLine();

        Console.WriteLine(dispatcher.IsWorking()?"Dispatcher still working": "No more jobs in queue");

        Console.WriteLine();
        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);

        Console.ReadLine();
    }

希望对您有所帮助。


@Steven Sudit 向我指出了一些问题,所以在这里我尝试给出我的愿景。

1) 我不建议在这里或其他任何地方使用 SortedList,因为它是一个过时的 .NET 1.1 类

SortedList 无论如何都没有过时。它仍然存在于 .NET 4.0 中,并在将泛型引入语言时在 .NET 2.0 中引入。我看不出有什么必要将其从 .NET 中删除。

但真正的问题是我试图回答:什么数据结构可以按排序顺序存储值,并且在存储索引方面会高效。有两种合适的现成数据结构: SortedDictionary排序列表此处有关如何选择的一些信息。我只是不想用我自己的代码浪费实现并隐藏主要算法。在这里我可以实现优先级数组或其他东西,但需要更多的代码行。我看不出有什么理由不使用 SortedList<>在这里...

顺便说一句,我不明白为什么你不推荐它? 原因是什么

2) 一般来说,对于同时发生的事件的特殊情况,无需使代码复杂化。

当@Jrud 说他可能有很多任务要安排时,我认为他们可能有大量并发,所以我演示了如何解决它。但我的观点是:即使并发性较低,您仍然有机会同时获取事件。在多线程环境中或者当有许多源想要调度作业时,这也很容易实现。

互锁功能并不那么复杂,便宜,而且由于 .NET 4.0 内联,所以在这种情况下添加防护是没有问题的。

3) IsWorking 方法应该只使用内存屏障,然后直接读取值。

我不太确定你是对的。我建议阅读两篇不错的文章:Part 4: Advanced Threading of Threading in C# by Joseph Albahari 和 Jeff Moser 的 锁如何上锁? 。当然还有 Jeffrey Richter 的 CLR via C#(第 3 版)第 28 章(原始线程同步构造)。

这里有一些引用:

MemoryBarrier 方法不访问内存,但它会强制执行任何较早的程序顺序
加载和存储要在调用 MemoryBarrier 之前完成。而且它还
强制在调用后完成任何后续程序顺序加载和存储
内存屏障。 MemoryBarrier 的用处远不如其他两种方法

重要的是我知道这可能会非常令人困惑,所以让我将其总结为一个简单的规则:
当线程通过共享内存相互通信时,通过以下方式写入最后一个值
调用 VolatileWrite 并通过调用 VolatileRead 读取第一个值。

我还建议:英特尔® 64 和 IA-32 架构软件开发人员手册(如果您)认真关心它。

因此,我在代码中既不使用 VolatileRead/VolatileWrite,也不使用 volatile 关键字,我认为 Thread.MemoryBarrier 在这里不会更好。也许你可以指出我错过了什么?一些文章或深入讨论?

4) GetJobs 方法看起来可能会锁定很长一段时间。有必要吗?

首先,它只是方便的方法,有时至少需要将所有任务放入队列中进行调试。

但你错了。正如我在代码注释中提到的 SortedList<>作为两个数组实现,您可以通过参考源或仅通过在 Reflector 中查看来检查这一点。这里有一些来自参考源的评论:

// A sorted list internally maintains two arrays that store the keys and
// values of the entries.  

我从 .NET 4.0 获得,但自 2-3.5 以来没有太大变化

所以我的代码:

_jobs.Values.SelectMany(list => list).ToList().AsReadOnly();

涉及以下内容:

  • 迭代对 List 的引用数组中的值。索引数组非常快。
  • 迭代每个列表(内部也以数组形式实现)。它也非常快。
  • 构建新的引用列表(通过 ToList()),这也非常快(只是动态数组)(.NET 有非常可靠和快速的实现)
  • 构建只读包装器(没有复制,只是迭代器包装器)

,因此我们只是展平对 Job 对象的引用的只读列表。即使您有数百万个任务,它也非常快。尝试衡量一下​​自己。

我添加它的任何方式都是为了显示执行周期期间发生的情况(出于调试目的),但我认为它可能很有用。

5) .NET 4.0 中提供了无锁队列。

我建议阅读 并行编程模式 作者:Stephen Toub 和 线程安全集合NET Framework 4 及其性能特征,还有这里有许多有趣的文章。

所以我 引用

ConcurrentQueue(T) 是 .NET Framework 4 中的一种数据结构,它提供对 FIFO(先进先出)有序元素的线程安全访问。在底层,ConcurrentQueue(T) 是使用小数组列表以及头数组和尾数组上的无锁操作来实现的,因此它与由数组支持并依赖于外部使用的 Queue(T) 有很大不同监视器提供同步。 ConcurrentQueue(T) 当然比手动锁定 Queue(T) 更安全、更方便,但需要进行一些实验来确定这两种方案的相对性能。在本节的其余部分中,我们将把手动锁定的 Queue(T) 称为独立类型,称为 SynchronizedQueue(T)。

它没有任何方法来维护有序队列。任何新的线程安全集合都不是,它们都维护无序集合。但是阅读原始的@Jrud 描述,我认为我们必须维护需要触发任务的时间的有序列表。我错了吗?

6) 我不会费心启动和停止调度程序;让它休眠直到下一次工作

您知道让 ThreadPool 的线程休眠的好方法吗?您将如何实施?

我认为调度员在不处理任何任务并安排作业唤醒它时就会“睡眠”。无论如何,没有特殊的处理来使其睡眠或唤醒,所以在我看来,这个过程等于“睡眠”。

如果您告诉我应该在没有可用作业时通过 ThreadPool 重新安排 RunJobs,那么您错了,它将占用太多资源并可能影响已启动的作业。自己尝试一下。当我们可以轻松避免不必要的工作时,为什么还要去做呢?

7)您不必担心不同类型的刻度,只需坚持毫秒。

你不对。要么你坚持蜱虫,要么你完全不关心它。检查 DateTime 实现,每次对毫秒属性的访问都涉及将内部表示形式(以刻度为单位)转换为毫秒(包括除法)。这可能会损害旧(奔腾级)计算机的性能(我自己测量,你也可以)。

总的来说,我会同意你的观点。我们不关心这里的代表性,因为它不会给我们带来明显的性能提升。

这只是我的习惯。我在最近的项目中处理了数十亿个日期时间,因此根据它进行了编码。在我的项目中,通过刻度和日期时间的其他组件进行处理之间存在明显的差异。

8)尝试跟踪可用线程似乎不太可能有效

我只是想证明您必须关心它。在现实世界中,您必须实现与我的调度和监视资源的直接逻辑相去甚远的逻辑。

我想演示定时器轮算法并指出作者在实现它时必须考虑的一些问题。

你说得完全正确,我必须对此发出警告。我认为“快速原型”就足够了。我的解决方案无论如何都不能在生产中使用。

EDIT: I remember interesting interview definetely worth to view: Arun Kishan: Inside Windows 7 - Farewell to the Windows Kernel Dispatcher Lock

As @Steven Sudit stated I warn again: use it only as demo on how timer wheel works and some tasks you have to care about while implement it. Not as reference implementation. In real world you have to write far more complex logic to take into account available resources, scheduling logic and etc.


Here good points stated by Steven Sudit (read post comments for details):

1) Choose right structure to keep your jobs list (it depends as usually):

  • SortedList<> (or SortedDictionary<>) good on memory consumption and indexing but have to implement synchronized access

  • ConcurrentQueue<> will help you avoid locking but you have to implement ordering. It also very memory efficient

  • LinkedList<> is good on insert and retrieve (anyway we need head only) but requires synchronized access (thru it easily implemented via lock-free) and not so memory efficient as it stores two references (prev/next). But it become an issue when you have millions of jobs so all of them take significant amount of memory.

But:

I totally agree with @Steven:

It doesn't matter: neither one of these is a good fit. The right answer would be to use a regular queue and maintain its order yourself, since we most often need to access it only from the head or tail.

Generally, I would recommend using the most feature-full collection from the library, but that doesn't apply here because this is system-level code. We'd need to roll our own, either from scratch or on top of a less feature-rich collection

2) To simplify processing logic of simultaneous jobs you can add delegate list (e.g. via ConcurrentQueue to make it lock-free) into original Job class so when you need another job at same time you just add another delegate to start.

@Steven:

If two tasks are actually scheduled for the same time (whether actually or effectively), this is a normal case that does not require complicating our data structure. In other words, we don't need to group up simultaneous jobs so that we have to traverse two different collections; we can just make them adjacent

3) Start/stoping dispatcher not so straightful as it can be and so can lead to errors. Instead you can wait on an event while using a timeout.

@Steven:

This way, it will either wake up when the next job is ready or when a new job is inserted before the head. In the latter case, it could need to run it now or set a different wait. If presented with, say, 100 jobs all scheduled for the same instant, the best thing we can do is queue them all up.

If we needed to provide prioritization, that's a job for a priority dispatch queue and multiple pools in a producer/consumer relationship, but it still doesn't justify a start/stop dispatcher. The dispatcher should always be on, running in a single loop that sometimes cedes the core

4) About using ticks:

@Steven:

Sticking to one type of tick is fine, but mixing and matching gets ugly, particularly since it's hardware-dependent. I'm sure that ticks would be slightly faster than milliseconds, because it stores the former and has to divide by a constant to get the latter. Whether this operation winds up being costly is another matter, but I'm fine with using ticks to avoid the risk.

My thoughts:

Another good point, I agree with you. But sometimes division by constant becomes costly and it not so fast as it maybe seems to be. But when we talk about 100 000 of DateTimes it does not matter, you are right, thank you to point.

5) "Managing resources":

@Steven:

The problem I'm trying to highlight is that the call to GetAvailableThreads is expensive and naive; the answer is obsolete before you can even use it. If we really wanted to keep track, we could get initial values and keep a running count by calling the job from a wrapper that uses Interlocked.Increment/Decrement. Even then, it presumes that the rest of the program is not using the thread pool. If we really wanted fine control, then the right answer here is to roll our own thread pool

I absolutely agree that calling to GetAvailableThreads is naive method to monitor available resources thru CorGetAvailableThreads not so expensive. I want to demontrate there are needs to manage resources and seems to choose bad example.

By any means provided in source code example is must not be treated as right way to monitor available resources. I just want to demonstrate you have to think about it. Thru maybe coded no so good piece of code as example.

6) Using Interlocked.CompareExchange:

@Steven:

No, it's not a common pattern. The most common pattern is to briefly lock. Less common is to flag the variable as volatile. Much less common would be to use VolatileRead or MemoryBarrier. Using Interlocked.CompareExchange this way is obscure, even if Richter does it. using it without an explanatory comment is absolutely guaranteed to confuse, as the word "Compare" implies that we're doing a comparison, when in fact we're not.

You are right I have to point about its usage.


using System;
using System.Threading;

// Job.cs

// WARNING! Your jobs (tasks) have to be ASYNCHRONOUS or at least really short-living
// else it will ruin whole design and ThreadPool usage due to potentially run out of available worker threads in heavy concurrency

// BTW, amount of worker threads != amount of jobs scheduled via ThreadPool
// job may waits for any IO (via async call to Begin/End) at some point 
// and so free its worker thread to another waiting runner

// If you can't achieve this requirements then just use usual Thread class
// but you will lose all ThreadPool's advantages and will get noticeable overhead

// Read http://msdn.microsoft.com/en-us/magazine/cc164139.aspx for some details

// I named class "Job" instead of "Task" to avoid confusion with .NET 4 Task 
public class Job
{
    public DateTime FireTime { get; private set; }

    public WaitCallback DoAction { get; private set; }
    public object Param { get; private set; }

    // Please use UTC datetimes to avoid different timezones problem
    // Also consider to _never_ use DateTime.Now in repeat tasks because it significantly slower 
    // than DateTime.UtcNow (due to using TimeZone and converting time according to it)

    // Here we always work with with UTC
    // It will save you a lot of time when your project will get jobs (tasks) posted from different timezones
    public static Job At(DateTime fireTime, WaitCallback doAction, object param = null)
    {
        return new Job {FireTime = fireTime.ToUniversalTime(), DoAction = doAction, Param = param};
    }

    public override string ToString()
    {
        return string.Format("{0}({1}) at {2}", DoAction != null ? DoAction.Method.Name : string.Empty, Param,
                             FireTime.ToLocalTime().ToString("o"));
    }
}

 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Linq;
 using System.Threading;

// Dispatcher.cs

// Take a look at System.Runtime IOThreadTimer.cs and IOThreadScheduler.cs
// in Microsoft Reference Source, its interesting reading

public class Dispatcher
{
    // You need sorted tasks by fire time. I use Ticks as a key to gain some speed improvements during checks
    // There are maybe more than one task in same time
    private readonly SortedList<long, List<Job>> _jobs;

    // Synchronization object to access _jobs (and _timer) and make it thread-safe
    // See comment in ScheduleJob about locking
    private readonly object _syncRoot;

    // Queue (RunJobs method) is running flag
    private int _queueRun;

    // Flag to prevent pollute ThreadPool with many times scheduled JobsRun
    private int _jobsRunQueuedInThreadPool;

    // I'll use Stopwatch to measure elapsed interval. It is wrapper around QueryPerformanceCounter
    // It does not consume any additional resources from OS to count

    // Used to check how many OS ticks (not DateTime.Ticks!) elapsed already
    private readonly Stopwatch _curTime;

    // Scheduler start time. It used to build time delta for job
    private readonly long _startTime;

    // System.Threading.Timer to schedule next active time
    // You have to implement syncronized access as it not thread-safe
    // http://msdn.microsoft.com/en-us/magazine/cc164015.aspx
    private readonly Timer _timer;

    // Minimum timer increment to schedule next call via timer instead ThreadPool
    // Read http://www.microsoft.com/whdc/system/pnppwr/powermgmt/Timer-Resolution.mspx
    // By default it around 15 ms
    // If you want to know it exactly use GetSystemTimeAdjustment via Interop ( http://msdn.microsoft.com/en-us/library/ms724394(VS.85).aspx )
    // You want TimeIncrement parameter from there
    private const long MinIncrement = 15 * TimeSpan.TicksPerMillisecond;

    // Maximum scheduled jobs allowed per queue run (specify your own suitable value!)
    // Scheduler will add to ThreadPool queue (and hence count them as processed) no more than this constant

    // This is balance between how quick job will be scheduled after it time elapsed in one side, and 
    // how long JobsList will be blocked and RunJobs owns its thread from ThreadPool
    private const int MaxJobsToSchedulePerCheck = 10;

    // Queue length
    public int Length
    {
        get
        {
            lock (_syncRoot)
            {
                return _jobs.Count;
            }
        }
    }

    public Dispatcher()
    {
        _syncRoot = new object();

        _timer = new Timer(RunJobs);

        _startTime = DateTime.UtcNow.Ticks;
        _curTime = Stopwatch.StartNew();

        _jobs = new SortedList<long, List<Job>>();
    }


    // Is dispatcher still working
    // Warning! Queue ends its work when no more jobs to schedule but started jobs can be still working
    public bool IsWorking()
    {
        return Interlocked.CompareExchange(ref _queueRun, 0, 0) == 1;
    }

    // Just handy method to get current jobs list
    public IEnumerable<Job> GetJobs()
    {
        lock (_syncRoot)
        {
            // We copy original values and return as read-only collection (thread-safety reasons)
            return _jobs.Values.SelectMany(list => list).ToList().AsReadOnly();
        }
    }

    // Add job to scheduler queue (schedule it)
    public void ScheduleJob(Job job)
    {
        // WARNING! This will introduce bottleneck if you have heavy concurrency. 
        // You have to implement lock-free solution to avoid botleneck but this is another complex topic.
        // Also you can avoid lock by using Jeffrey Richter's ReaderWriterGateLock (http://msdn.microsoft.com/en-us/magazine/cc163532.aspx)
        // But it can introduce significant delay under heavy load (due to nature of ThreadPool)
        // I recommend to implement or reuse suitable lock-free algorithm. 
        // It will be best solution in heavy concurrency (if you have to schedule large enough job count per second)
        // otherwise lock or maybe ReaderWriterLockSlim is cheap enough
        lock (_syncRoot)
        {
            // We'll shift start time to quick check when it pasts our _curTime
            var shiftedTime = job.FireTime.Ticks - _startTime;

            List<Job> jobs;
            if (!_jobs.TryGetValue(shiftedTime, out jobs))
            {
                jobs = new List<Job> {job};
                _jobs.Add(shiftedTime, jobs);
            }
            else jobs.Add(job);


            if (Interlocked.CompareExchange(ref _queueRun, 1, 0) == 0)
            {
                // Queue not run, schedule start
                Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0);
                ThreadPool.QueueUserWorkItem(RunJobs);
            }
            else 
            {
                // else queue already up and running but maybe we need to ajust start time
                // See detailed comment in RunJobs

                long firetime = _jobs.Keys[0];
                long delta = firetime - _curTime.Elapsed.Ticks;

                if (delta < MinIncrement)
                {
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else 
                {
                    Console.WriteLine("DEBUG: Wake up time changed. Next event in {0}", TimeSpan.FromTicks(delta));
                    _timer.Change(delta/TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }

        }
    }


    // Job runner
    private void RunJobs(object state)
    {
        // Warning! Here I block list until entire process done, 
        // maybe better will use ReadWriterLockSlim or somewhat (e.g. lock-free)
        // as usually "it depends..."

        // Here processing is really fast (a few operation only) so until you have to schedule many jobs per seconds it does not matter
        lock (_syncRoot)
        {
            // We ready to rerun RunJobs if needed
            Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 0, 1);

            int availWorkerThreads;
            int availCompletionPortThreads;

            // Current thread stats
            ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);

            // You can check max thread limits by
            // ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);

            int jobsAdded = 0;

            while (jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1 && _jobs.Count > 0)
            {
                // SortedList<> implemented as two arrays for keys and values so indexing on key/value will be fast
                // First element
                List<Job> curJobs = _jobs.Values[0];
                long firetime = _jobs.Keys[0];

                // WARNING! Stopwatch ticks are different from DateTime.Ticks
                // so we use _curTime.Elapsed.Ticks instead of _curTime.ElapsedTicks

                // Each tick in the DateTime.Ticks value represents one 100-nanosecond interval. 
                // Each tick in the ElapsedTicks value represents the time interval equal to 1 second divided by the Frequency.
                if (_curTime.Elapsed.Ticks <= firetime) break;

                while (curJobs.Count > 0 &&  jobsAdded < MaxJobsToSchedulePerCheck && availWorkerThreads > MaxJobsToSchedulePerCheck + 1)
                {
                    var job = curJobs[0];

                    // Time elapsed and we ready to start job
                    if (job.DoAction != null)
                    {
                        // Schedule new run

                        // I strongly recommend to look at new .NET 4 Task class because it give superior solution for managing Tasks
                        // e.g. cancel run, exception handling, continuation, etc
                        ThreadPool.QueueUserWorkItem(job.DoAction, job);
                        ++jobsAdded;

                        // It may seems that we can just decrease availWorkerThreads by 1 
                        // but don't forget about started jobs they can also consume ThreadPool's threads
                        ThreadPool.GetAvailableThreads(out availWorkerThreads, out availCompletionPortThreads);
                    }

                    // Remove job from list of simultaneous jobs
                    curJobs.Remove(job);
                }

                // Remove whole list if its empty
                if (curJobs.Count < 1) _jobs.RemoveAt(0);
            }

            if (_jobs.Count > 0)
            {
                long firetime = _jobs.Keys[0];

                // Time to next event
                long delta = firetime - _curTime.Elapsed.Ticks; 

                if (delta < MinIncrement) 
                {
                    // Schedule next queue check via ThreadPool (immediately)
                    // It may seems we start to consume all resouces when we run out of available threads (due to "infinite" reschdule)
                    // because we pass thru our while loop and just reschedule RunJobs
                    // but this is not right because before RunJobs will be started again
                    // all other thread will advance a bit and maybe even complete its task
                    // so it safe just reschedule RunJobs and hence wait when we get some resources
                    if (Interlocked.CompareExchange(ref _jobsRunQueuedInThreadPool, 1, 0) == 0)
                    {
                        _timer.Change(Timeout.Infinite, Timeout.Infinite);
                        ThreadPool.QueueUserWorkItem(RunJobs);
                    }
                }
                else // Schedule next check via timer callback
                {
                    Console.WriteLine("DEBUG: Next event in {0}", TimeSpan.FromTicks(delta)); // just some debug output
                    _timer.Change(delta / TimeSpan.TicksPerMillisecond, Timeout.Infinite);
                }
            }
            else // Shutdown the queue, no more jobs
            {
                Console.WriteLine("DEBUG: Queue ends");
                Interlocked.CompareExchange(ref _queueRun, 0, 1); 
            }
        }
    }
}

Quick example of usage:

    // Test job worker
    static void SomeJob(object param)
    {
        var job = param as Job;
        if (job == null) return;

        Console.WriteLine("Job started: {0}, [scheduled to: {1}, param: {2}]", DateTime.Now.ToString("o"),
                          job.FireTime.ToLocalTime().ToString("o"), job.Param);
    }

    static void Main(string[] args)
    {
        var curTime = DateTime.UtcNow;
        Console.WriteLine("Current time: {0}", curTime.ToLocalTime().ToString("o"));
        Console.WriteLine();

        var dispatcher = new Dispatcher();

        // Schedule +10 seconds to future
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:1"));
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(10), SomeJob, "+10 sec:2"));

        // Starts almost immediately
        dispatcher.ScheduleJob(Job.At(curTime - TimeSpan.FromMinutes(1), SomeJob, "past"));

        // And last job to test
        dispatcher.ScheduleJob(Job.At(curTime + TimeSpan.FromSeconds(25), SomeJob, "+25 sec"));

        Console.WriteLine("Queue length: {0}, {1}", dispatcher.Length, dispatcher.IsWorking()? "working": "done");
        Console.WriteLine();

        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);
        Console.WriteLine();

        Console.ReadLine();

        Console.WriteLine(dispatcher.IsWorking()?"Dispatcher still working": "No more jobs in queue");

        Console.WriteLine();
        foreach (var job in dispatcher.GetJobs()) Console.WriteLine(job);

        Console.ReadLine();
    }

Hope it will be helpful.


@Steven Sudit point me some issues, so here I try to give my vision.

1) I wouldn't recommend using SortedList here, or anywhere else, as it's an obsolete .NET 1.1 class

SortedList<> by any means is not obsolete. It still exists in .NET 4.0 and introduced in .NET 2.0 when generics was introduced into language. I can't see any point to remove it from .NET.

But real question here I trying to answer: What data structure can store values in sorted order and will be efficient in storing and indexing. There are two suitable ready to use data structures: SortedDictionary<> and SortedList<>. Here some info about how to choose. I just don't want waste implementation with my own code and hide main algorithm. Here I can implement priority-array or something other but it takes more lines to code. I don't see any reason do not use SortedList<> here...

BTW, I can't understand why you not recommend it? What are reasons?

2) In general, there's no need to complicate the code with special cases for simultaneous events.

When @Jrud says he probably will have numerous task to schedule I think it they may have heavy concurrency, so I demonstrate how to solve it. But my point: even if you have low concurrency you stil have chance to get events in same time. Also this is easy possible in multithreaded evironment or when there are many sources want to schedule jobs.

Interlocked functions not so complicated, cheap and since .NET 4.0 inlined so there are no problem to add guard in such situation.

3) The IsWorking method should just use a memory barrier and then read the value directly.

Im not so sure here that you are right. I would recommend to read two nice articles: Part 4: Advanced Threading of Threading in C# by Joseph Albahari and How Do Locks Lock? by Jeff Moser. And of cause Chapter 28 (Primitive Thread Synchronization Constructs) of CLR via C# (3rd edition) by Jeffrey Richter.

Here some qoute:

The MemoryBarrier method doesn’t access memory but it forces any earlier programorder
loads and stores to be completed before the call to MemoryBarrier. And it also
forces any later program-order loads and stores to be completed after the call to
MemoryBarrier. MemoryBarrier is much less useful than the other two methods

Important I know that this can be very confusing, so let me summarize it as a simple rule:
When threads are communicating with each other via shared memory, write the last value by
calling VolatileWrite and read the first value by calling VolatileRead.

I would also recommend: Intel® 64 and IA-32 Architectures Software Developer's Manuals if you care about it seriously.

So I don't use VolatileRead/VolatileWrite in my code neither volatile keyword, I don't think Thread.MemoryBarrier will be better here. Maybe you can point me what I miss? Some articles or in-depth discussion?

4) The GetJobs method looks like it could lock for an extended period. Is it necessary?

First of all its just handy method, sometime it is necessary to get all tasks in queue at least for debugging.

But you are not right. As I mentioned in code comments SortedList<> implemented as two arrays you can check this by Reference Source or just by viewing in Reflector. Here some comments from reference source:

// A sorted list internally maintains two arrays that store the keys and
// values of the entries.  

I got from .NET 4.0 but it not changed much since 2-3.5

So my code:

_jobs.Values.SelectMany(list => list).ToList().AsReadOnly();

involve following:

  • iterate thru values in array of references to List. Indexing array is very fast.
  • iterate thru each List (which is implemented internally as array too). It very fast too.
  • build new List of references (via ToList()) which is very fast too (just dynamic array) (.NET has very solid and fast implementation)
  • build read-only wrapper (no copy, just iterator wrapper)

so consequently we have just flatten read-only list of references to Job's objects. It very fast even you have millions of task. Try to measure yourself.

Any way I added it to show what happens during execution cycle (for debug purposes) but I think it can be useful.

5) A lock-free queue is available in .NET 4.0.

I would recommend to read Patterns of parallel programming by Stephen Toub and Thread-safe Collections in .NET Framework 4 and Their Performance Characteristics, also here many interesting articles.

So I quote:

ConcurrentQueue(T) is a data structure in .NET Framework 4 that provides thread-safe access to FIFO (First-In First-Out) ordered elements. Under the hood, ConcurrentQueue(T) is implemented using a list of small arrays and lock-free operations on the head and tail arrays, hence it is quite different than Queue(T) which is backed by an array and relies on the external use of monitors to provide synchronization. ConcurrentQueue(T) is certainly more safe and convenient than manual locking of a Queue(T) but some experimentation is required to determine the relative performance of the two schemes. In the remainder of this section, we will refer to a manually locked Queue(T) as a self-contained type called SynchronizedQueue(T).

It don't have any methods to maintain ordered queue. Neither any of new thread-safe collection, they all maintain unordered collection. But reading original @Jrud description I think we have to maintain ordered list of time when task need to be fired. Am I wrong?

6) I wouldn't bother starting and stopping the dispatcher; just let it sleep until the next job

Do you know good way to make sleep ThreadPool's thread? How you will implement it?

I think dispatcher goes "sleep" when he does not process any task and schedule job wake-up it. Anyway there are no special processing to put it sleep or wake up so in my thoughts this process equals "sleep".

If you told that I should just reschedule RunJobs via ThreadPool when no jobs available when you are wrong it will eat too many resources and can impact started jobs. Try yourself. Why to do unnecessary job when we can easily avoid it.

7) Rather than worrying about different kinds of ticks, you could just stick to milliseconds.

You are not right. Either you stick to ticks or you don't care about it entirely. Check DateTime implementation, each access to milliseconds property involve converting internal representaion (in ticks) to ms including division. This can hurt performance on old (Pentium class) compulters (I measure it myself and you can too).

In general I will agree with you. We don't care about representation here because it does not give us noticeable performance boost.

It just my habbit. I process billions of DateTime in recent project so coded accordingly to it. In my project there are noticeable difference between processing by ticks and by other components of DateTime.

8) The attempt to keep track of available threads does not seem likely to be effective

I just want to demonstrate you have to care about it. In real world you have to implement far from my straightful logic of scheduling and monitoring resources.

I want to demonstrate timer wheel algorithm and point to some problem that author have to think when implement it.

You are absolutely right I have to warn about it. I thought "quickly ptototype" would be enough. My solution in any means can't be used in production.

浅紫色的梦幻 2024-10-04 19:57:13

以上都不是。标准解决方案是保留一个事件列表,以便每个事件都指向下一个要发生的事件。然后,您可以使用单个计时器,并让它在下一个事件发生时及时唤醒。

编辑

看起来这被称为计时器轮

编辑

正如 Sentinel 所指出的,事件应该被分派到线程池。这些事件的处理程序应该尽快完成少量工作,并且不会阻塞。如果需要执行 I/O,则应启动异步任务并终止。否则,线程池就会溢出。

.NET 4.0 Task 类在这里可能会有所帮助,特别是对于它的延续方法。

None of the above. The standard solution is to keep a list of events, such that each one points to the next one to occur. You then use a single timer and have it wake up only in time for the next event.

edit

Looks like this is called a timer wheel.

edit

As Sentinel pointed out, events should be dispatched to a thread pool. The handler for these events should do a small bit of work as quickly as possible, and without blocking. If it needs to do I/O, it should fire off an async task and terminate. Otherwise, a thread pool would overflow.

The .NET 4.0 Task class might be helpful here, particularly for its continuation methods.

人心善变 2024-10-04 19:57:13

三个选项中的权衡是在内存和 CPU 之间进行。更多的计时器意味着更多的计时器节点(内存),而将这些计时器聚合为更少的计时器意味着更多的 CPU,因为您在运行时检查需要服务的事件。对于良好的定时器实现来说,启动过多定时器(以及使它们过期)的 CPU 开销并不是太大的问题。

因此,在我看来,如果您有一个良好的计时器实现,请选择启动您需要的尽可能多的计时器(尽可能细化)。但是,如果每个对象的任何计时器是互斥的,请考虑重用计时器节点。

The tradeoff in your three options is between memory and CPU. More timers mean more timer nodes (memory), and aggregating these timers into fewer timers means more CPU, as you check for events that need servicing at run time. The CPU overhead in starting too many timers (and expiring them) is not too great an issue with a decent timer implementation.

SO, in my opinion, if you have a good timer implementation, choose to start as many timers as you need (be as granular as possible). But if any of these timers per object are mutually exclusive, consider reusing a timer node.

痴情 2024-10-04 19:57:13

这让我想起了旧的机票系统,那里需要排队。根据需要关注的类型,票务请求被放入不同的队列中。

因此,也许您可​​以拥有需要频繁关注的对象队列和需要不经常关注的对象队列。必要时,您可以将它们从一个移动到另一个。

您可以为频繁队列设置一个计时器,为不频繁队列设置一个计时器。对于频繁队列,您可以将其拆分为多个队列,每个队列一个。

为了处理频繁的队列,
您的线程数不应多于核心数。如果你有两个核心,你要做的就是让它们都启动。任何更多的线程都不会让事情变得更快。事实上,如果处理对象需要磁盘 I/O 或排队使用其他共享硬件,则甚至可能无助于让两个内核都运行。

This reminds me of the old airline ticketing systems, where you had queues. Ticketing requests were put in different queues depending on what kind of attention they needed.

So maybe you could have the queue of objects requiring frequent attention, and the queue of objects requiring infrequent attention. When necessary, you move them from one to the other.

You could have a timer for the frequent queue, and a timer for the infrequent queue. For the frequent queue, you could split it into multiple queues, one for each thread.

For crunching the frequent queue(s),
you should not have more threads than you have cores. If you have two cores, what you want to do is get both of them cranking. Any more threads than that will not make things any faster. In fact, if processing the objects requires disk I/O or getting in line for some other shared hardware, it may not even help to get both cores running.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文