串行任务执行器;这个线程安全吗?

发布于 2024-09-18 07:02:07 字数 2915 浏览 4 评论 0原文

我创建了一个类,以允许异步顺序执行任务,使用 ThreadPool 作为执行手段。我的想法是,我将有多个实例在后台运行串行任务,但我不想为每个实例都有一个单独的专用线程。我想检查的是这个类是否真的是线程安全的。它相当简短,所以我想我应该由这里的专家来运行它,以防我遗漏一些明显的东西。我省略了一些针对不同操作类型的便利重载。

/// <summary>
/// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance.
/// Only one task in the queue will execute at a time, with the order of execution matching the order of addition.
/// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks.
/// </summary>
public sealed class SerialAsyncTasker
{
    private readonly Queue<Action> mTasks = new Queue<Action>();
    private bool mTaskExecuting;

    /// <summary>
    /// Queue a new task for asynchronous execution on the thread pool.
    /// </summary>
    /// <param name="task">Task to execute</param>
    public void QueueTask(Action task)
    {
        if (task == null) throw new ArgumentNullException("task");

        lock (mTasks)
        {
            bool isFirstTask = (mTasks.Count == 0);
            mTasks.Enqueue(task);

            //Only start executing the task if this is the first task
            //Additional tasks will be executed normally as part of sequencing
            if (isFirstTask && !mTaskExecuting)
                RunNextTask();
        }
    }

    /// <summary>
    /// Clear all queued tasks.  Any task currently executing will continue to execute.
    /// </summary>
    public void Clear()
    {
        lock (mTasks)
        {
            mTasks.Clear();
        }
    }

    /// <summary>
    /// Wait until all currently queued tasks have completed executing.
    /// If no tasks are queued, this method will return immediately.
    /// This method does not prevent the race condition of a second thread 
    /// queueing a task while one thread is entering the wait;
    /// if this is required, it must be synchronized externally.
    /// </summary>
    public void WaitUntilAllComplete()
    {
        lock (mTasks)
        {
            while (mTasks.Count > 0 || mTaskExecuting)
                Monitor.Wait(mTasks);
        }
    }

    private void RunTask(Object state)
    {
        var task = (Action)state;
        task();
        mTaskExecuting = false;
        RunNextTask();
    }

    private void RunNextTask()
    {
        lock (mTasks)
        {
            if (mTasks.Count > 0)
            {
                mTaskExecuting = true;
                var task = mTasks.Dequeue();
                ThreadPool.QueueUserWorkItem(RunTask, task);
            }
            else
            {
                //If anybody is waiting for tasks to be complete, let them know
                Monitor.PulseAll(mTasks);
            }
        }
    }
}

更新:我修改了代码以修复西蒙指出的主要错误。现在已经通过了单元测试,但我仍然欢迎观察。

I have a class that I've created to allow asynchronous sequential execution of tasks, using the ThreadPool as the means of execution. The idea is that I'll have multiple instances running serial tasks in the background, but I don't want to have a separate dedicated Thread for each instance. What I'd like to check is whether this class is actually thread safe. It's fairly brief, so I thought I'd run it by the experts here, in case I'm missing something obvious. I've omitted a few of the convenience overloads for different Action types.

/// <summary>
/// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance.
/// Only one task in the queue will execute at a time, with the order of execution matching the order of addition.
/// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks.
/// </summary>
public sealed class SerialAsyncTasker
{
    private readonly Queue<Action> mTasks = new Queue<Action>();
    private bool mTaskExecuting;

    /// <summary>
    /// Queue a new task for asynchronous execution on the thread pool.
    /// </summary>
    /// <param name="task">Task to execute</param>
    public void QueueTask(Action task)
    {
        if (task == null) throw new ArgumentNullException("task");

        lock (mTasks)
        {
            bool isFirstTask = (mTasks.Count == 0);
            mTasks.Enqueue(task);

            //Only start executing the task if this is the first task
            //Additional tasks will be executed normally as part of sequencing
            if (isFirstTask && !mTaskExecuting)
                RunNextTask();
        }
    }

    /// <summary>
    /// Clear all queued tasks.  Any task currently executing will continue to execute.
    /// </summary>
    public void Clear()
    {
        lock (mTasks)
        {
            mTasks.Clear();
        }
    }

    /// <summary>
    /// Wait until all currently queued tasks have completed executing.
    /// If no tasks are queued, this method will return immediately.
    /// This method does not prevent the race condition of a second thread 
    /// queueing a task while one thread is entering the wait;
    /// if this is required, it must be synchronized externally.
    /// </summary>
    public void WaitUntilAllComplete()
    {
        lock (mTasks)
        {
            while (mTasks.Count > 0 || mTaskExecuting)
                Monitor.Wait(mTasks);
        }
    }

    private void RunTask(Object state)
    {
        var task = (Action)state;
        task();
        mTaskExecuting = false;
        RunNextTask();
    }

    private void RunNextTask()
    {
        lock (mTasks)
        {
            if (mTasks.Count > 0)
            {
                mTaskExecuting = true;
                var task = mTasks.Dequeue();
                ThreadPool.QueueUserWorkItem(RunTask, task);
            }
            else
            {
                //If anybody is waiting for tasks to be complete, let them know
                Monitor.PulseAll(mTasks);
            }
        }
    }
}

UPDATE: I've revised the code to fix the main bugs kindly pointed out by Simon. This passes unit tests now, but I still welcome observations.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

冬天旳寂寞 2024-09-25 07:02:07

不要这样做。 (或者至少避免构建自己的东西。)

使用系统.Threading.Tasks 内容(.NET 4.0 中的新增功能)。创建一个任务[](大小取决于您想要的并行任务的数量)并让他们从 BlockingCollection 读取工作项 在等待 CancellationToken 时。您的 WaitForAll 实现将触发您的令牌,并调用 Task .WaitAll(Task[]) 它将阻塞,直到所有任务完成。

Don't do it. (Or at least avoid building your own stuff.)

Use the System.Threading.Tasks stuff (new in .NET 4.0). Create your a Task[] (size depends on number of parallel tasks you want) and let them read work items from a BlockingCollection while waiting for a CancellationToken. Your WaitForAll implementation would trigger your token, and call Task.WaitAll(Task[]) which will block until all your tasks are done.

屋顶上的小猫咪 2024-09-25 07:02:07

这是我的第二个答案,假设您无法使用 .NET 4.0(并且希望对现有代码进行评论)。

QueueTask 将第一个任务放入队列,获取 isFirstTask = true,并启动一个新线程。但是,当第一个线程正在处理时,另一个线程可能会将某些内容排入队列,并且 Count == 0 => isFirstTask = true,并且又生成了另一个线程。

此外,如果任务执行抛出异常(这可能不一定会导致所有内容崩溃,具体取决于异常处理),WaitUntilAllComplete 将无限期挂起,从而导致它跳过对 RunNextTask() 的调用。

并且您的 WaitUntilAllComplete 只是等待,直到不再有排队任务,而不是当前正在执行的任务实际上正在执行(它们可能只是在 ThreadPool 中排队)或完成。

Here's my second answer assuming that you cant use .NET 4.0 (and want comments on your existing code).

QueueTask enqueues the first task, getting isFirstTask = true, and starts a new thread. However, another thread may enqueue something while the first thread is processing, and Count == 0 => isFirstTask = true, and yet another thread is spawned.

Also, WaitUntilAllComplete will hang indefinitely if the task execution throws an exception (which may not necessarily crash everything, depending on exception handling), causing it to skip the call to RunNextTask().

And your WaitUntilAllComplete just waits until there are no more enqueue tasks, not that those currently executing are actually executing (they could just be enqueued in the ThreadPool) or complete.

沦落红尘 2024-09-25 07:02:07

它内置于 4.0

如何:创建限制并发程度的任务计划程序< /a>

您还可以使用自定义调度程序来实现默认调度程序不提供的功能,例如严格的先进先出 (FIFO) 执行顺序。以下示例演示了如何创建自定义任务计划程序。该调度程序允许您指定并发程度。

It's built in in 4.0

How to: Create a Task Scheduler That Limits the Degree of Concurrency

You can also use a custom scheduler to achieve functionality that the default scheduler does not provide, such as strict first-in, first-out (FIFO) execution order. The following example demonstrates how to create a custom task scheduler. This scheduler lets you specify the degree of concurrency.

放肆 2024-09-25 07:02:07

我发现您的 SerialAsyncTasker 类存在一些问题,但听起来您可能很好地掌握了这些问题,因此我不会详细介绍该主题(我可能会用更多详细信息编辑我的答案)之后)。您在评论中指出,您不能使用 .NET 4.0 功能,也不能使用反应式扩展向后移植。我建议您在专用线程上使用生产者-消费者模式以及单个消费者。这完全符合您异步顺序执行任务的要求。

注意:您必须强化代码以支持正常关闭、处理异常等。

public class SerialAsyncTasker
{
  private BlockingCollection<Action> m_Queue = new BlockingCollection<Action>();

  public SerialAsyncTasker()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          Action task = m_Queue.Take();
          task();
        }
      });
    thread.IsBackground = true;
    thread.Start();
  }

  public void QueueTask(Action task)
  {
    m_Queue.Add(task);
  }
}

很遗憾您无法使用 .NET 4.0 BCL 或响应式扩展下载中的 BlockingCollection,但不用担心。其实自己实现一个并不太难。您可以使用 Stephen Toub 的阻塞队列 作为一个起点,只是重命名一些东西。

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take()
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0) Monitor.Wait(m_Queue);
            return m_Queue.Dequeue();
        }
    }

    public void Add(T value)
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(value);
            Monitor.Pulse(m_Queue);
        }
    }
}

I see a few issues your with your SerialAsyncTasker class, but it sounds like you might have a good grasp of those so I will not go into any details on that topic (I may edit my answer with more details later). You indicated in the comments that you cannot use .NET 4.0 features nor can you use the Reactive Extensions backport. I propose that you use the producer-consumer pattern with a single consumer on a dedicated thread. This would perfectly fit your requirement of asynchronously executing tasks sequentially.

Note: You will have to harden the code to support gracefully shutting down, handling exceptions, etc.

public class SerialAsyncTasker
{
  private BlockingCollection<Action> m_Queue = new BlockingCollection<Action>();

  public SerialAsyncTasker()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          Action task = m_Queue.Take();
          task();
        }
      });
    thread.IsBackground = true;
    thread.Start();
  }

  public void QueueTask(Action task)
  {
    m_Queue.Add(task);
  }
}

Too bad you cannot use the BlockingCollection from the .NET 4.0 BCL or Reactive Extension download, but no worries. It is actually not too hard to implement one yourself. You can use Stephen Toub's blocking queue as a starting point and just rename a few things.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take()
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0) Monitor.Wait(m_Queue);
            return m_Queue.Dequeue();
        }
    }

    public void Add(T value)
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(value);
            Monitor.Pulse(m_Queue);
        }
    }
}
昨迟人 2024-09-25 07:02:07
public class ParallelExcecuter
{
    private readonly BlockingCollection<Task> _workItemHolder;

    public ParallelExcecuter(int maxDegreeOfParallelism)
    {
        _workItemHolder = new BlockingCollection<Task>(maxDegreeOfParallelism);
    }

    public void Submit(Action action)
    {
        _workItemHolder.Add(Task.Run(action).ContinueWith(t =>
        {
            _workItemHolder.Take();
        }));

    }

    public void WaitUntilWorkDone()
    {
        while (_workItemHolder.Count < 0)
        {
            Monitor.Wait(_workItemHolder);
        }
    }
}
public class ParallelExcecuter
{
    private readonly BlockingCollection<Task> _workItemHolder;

    public ParallelExcecuter(int maxDegreeOfParallelism)
    {
        _workItemHolder = new BlockingCollection<Task>(maxDegreeOfParallelism);
    }

    public void Submit(Action action)
    {
        _workItemHolder.Add(Task.Run(action).ContinueWith(t =>
        {
            _workItemHolder.Take();
        }));

    }

    public void WaitUntilWorkDone()
    {
        while (_workItemHolder.Count < 0)
        {
            Monitor.Wait(_workItemHolder);
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文