C#:更好的线程架构

发布于 2024-09-25 13:10:16 字数 326 浏览 0 评论 0 原文

我有兴趣从您那里得到一些关于什么是一个好的/更好的线程架构的想法,该架构尊重下面描述的规则:

  • 线程必须在应用程序的生命周期中运行,如果有的话,处于睡眠/等待模式队列中没有要执行的工作。

  • 线程必须具有 BelowNormal 优先级(这消除了使用 ThreadPool 的可能性)。

  • 线程必须在任务完成后向主线程提供反馈。

  • 线程将监视队列以获得更多要执行的作业。

我正在使用 .Net Framework 4.0

让我知道你的想法:)

I am interested to get some ideas from you about what would be a good/better threading architecture that respects the rules described below:

  • A thread must be running for the life of the application, in the sleep/wait mode if there is no work in the queue to be performed.

  • A thread must be of a BelowNormal priority (this eliminates possibility of using ThreadPool).

  • The thread must give its feedback to the main thread upon completion of task.

  • Thread will monitor a Queue<T> to get more jobs to be performed.

I am using .Net Framework 4.0

Let me know what you think :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

水晶透心 2024-10-02 13:10:16

当我需要实现自己的多线程处理时,我通常使用这样的东西:

public class MyWorker<T> : IDisposable
{
    private readonly Queue<T> _taskQueue; // task queue
    private readonly object _threadLock = new object();
    private Thread _thread; // worker thread
    private ManualResetEvent _evExit;
    private AutoResetEvent _evNewData;

    /// <summary>Override this to process data.</summary>
    protected abstract void ProcessData(T data);

    /// <summary>Override this to set other thread priority.</summary>
    protected virtual ThreadPriority ThreadPriority
    {
        get { return ThreadPriority.BelowNormal; }
    }

    protected MyWorker()
    {
        _taskQueue = new Queue<T>();
        _evExit = new ManualResetEvent(false);
        _evNewData = new AutoResetEvent(false);
    }

    ~MyWorker()
    {
        Dispose(false);
    }

    private void ThreadProc()
    {
        try
        {
            var wh = new WaitHandle[] { _evExit, _evNewData };
            while(true)
            {
                T data = default(T);
                bool gotData = false;
                lock(_taskQueue) // sync
                {
                    if(_taskQueue.Count != 0) // have data?
                    {
                        data = _taskQueue.Dequeue();
                        gotData = true;
                    }
                }
                if(!gotData)
                {
                    if(WaitHandle.WaitAny(wh) == 0) return; // demanded stop
                    continue; //we have data now, grab it
                }
                ProcessData(data);
                if(_evExit.WaitOne(0)) return;
            }
        }
        catch(ThreadInterruptedException)
        {
            // log warning - this is not normal
        }
        catch(ThreadAbortException)
        {
            // log warning - this is not normal
        }
    }

    public void Start()
    {
        lock(_threadLock)
        {
            if(_thread != null)
                throw new InvalidOperationException("Already running.");
            _thread = new Thread(ThreadProc)
            {
                Name = "Worker Thread",
                IsBackground = true,
                Priority = ThreadPriority,
            };
            _thread.Start();
        }
    }

    public void Stop()
    {
        lock(_threadLock)
        {
            if(_thread == null)
                throw new InvalidOperationException("Is not running.");
            _evExit.Set();
            if(!_thread.Join(1000))
                _thread.Abort();
            _thread = null;
        }
    }

    /// <summary>Enqueue data for processing.</summary>
    public void EnqueueData(T data)
    {
        lock(_taskQueue)
        {
            _taskQueue.Enqueue(data);
            _evNewData.Set(); // wake thread if it is sleeping
        }
    }

    /// <summary>Clear all pending data processing requests.</summary>
    public void ClearData()
    {
        lock(_taskQueue)
        {
            _taskQueue.Clear();
            _evNewData.Reset();
        }
    }

    protected virtual void Dispose(bool disposing)
    {
        lock(_threadLock)
        {
            if(_thread != null)
            {
                _evExit.Set();
                if(!_thread.Join(1000))
                    _thread.Abort();
                _thread = null;
            }
        }
        _evExit.Close();
        _evNewData.Close();
        if(disposing)
            _taskQueue.Clear();
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}

When I need to implement my own multi-threaded processing, I usually use something like this:

public class MyWorker<T> : IDisposable
{
    private readonly Queue<T> _taskQueue; // task queue
    private readonly object _threadLock = new object();
    private Thread _thread; // worker thread
    private ManualResetEvent _evExit;
    private AutoResetEvent _evNewData;

    /// <summary>Override this to process data.</summary>
    protected abstract void ProcessData(T data);

    /// <summary>Override this to set other thread priority.</summary>
    protected virtual ThreadPriority ThreadPriority
    {
        get { return ThreadPriority.BelowNormal; }
    }

    protected MyWorker()
    {
        _taskQueue = new Queue<T>();
        _evExit = new ManualResetEvent(false);
        _evNewData = new AutoResetEvent(false);
    }

    ~MyWorker()
    {
        Dispose(false);
    }

    private void ThreadProc()
    {
        try
        {
            var wh = new WaitHandle[] { _evExit, _evNewData };
            while(true)
            {
                T data = default(T);
                bool gotData = false;
                lock(_taskQueue) // sync
                {
                    if(_taskQueue.Count != 0) // have data?
                    {
                        data = _taskQueue.Dequeue();
                        gotData = true;
                    }
                }
                if(!gotData)
                {
                    if(WaitHandle.WaitAny(wh) == 0) return; // demanded stop
                    continue; //we have data now, grab it
                }
                ProcessData(data);
                if(_evExit.WaitOne(0)) return;
            }
        }
        catch(ThreadInterruptedException)
        {
            // log warning - this is not normal
        }
        catch(ThreadAbortException)
        {
            // log warning - this is not normal
        }
    }

    public void Start()
    {
        lock(_threadLock)
        {
            if(_thread != null)
                throw new InvalidOperationException("Already running.");
            _thread = new Thread(ThreadProc)
            {
                Name = "Worker Thread",
                IsBackground = true,
                Priority = ThreadPriority,
            };
            _thread.Start();
        }
    }

    public void Stop()
    {
        lock(_threadLock)
        {
            if(_thread == null)
                throw new InvalidOperationException("Is not running.");
            _evExit.Set();
            if(!_thread.Join(1000))
                _thread.Abort();
            _thread = null;
        }
    }

    /// <summary>Enqueue data for processing.</summary>
    public void EnqueueData(T data)
    {
        lock(_taskQueue)
        {
            _taskQueue.Enqueue(data);
            _evNewData.Set(); // wake thread if it is sleeping
        }
    }

    /// <summary>Clear all pending data processing requests.</summary>
    public void ClearData()
    {
        lock(_taskQueue)
        {
            _taskQueue.Clear();
            _evNewData.Reset();
        }
    }

    protected virtual void Dispose(bool disposing)
    {
        lock(_threadLock)
        {
            if(_thread != null)
            {
                _evExit.Set();
                if(!_thread.Join(1000))
                    _thread.Abort();
                _thread = null;
            }
        }
        _evExit.Close();
        _evNewData.Close();
        if(disposing)
            _taskQueue.Clear();
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}
寄离 2024-10-02 13:10:16
  • 线程的优先级必须低于正常(这消除了使用线程池的可能性)。

这似乎是使用 TPL 和 ThreadPool 的主要障碍。您确定您没有高估较低优先级的用处吗?

您将不得不投入大量工作来想出一些总是不如 TPL 强大(且测试/可靠性低得多)的东西。

我会重新考虑这一点。

  • A thread must be of a BelowNormal priority (this eliminates possibility of using ThreadPool).

This seems to be the main stumbling block for using the TPL and ThreadPool. Are you sure you're not over-estimating the usefulness of a lower priority?

You will have to put in a lot of work to come up with something that will always be much less powerful (and much less tested/reliable) than the TPL.

I would reconsider this.

り繁华旳梦境 2024-10-02 13:10:16

通过阅读上述条件

一些问题

1-是否有任何其他线程将填充队列中的作业< T> ?

如果答案是肯定的,则可以在此处使用生产者/消费者设计模式,我不知道 .net 4.0,但此设计可以在 .net 3.5 中实现。

例如,请参阅此处

By reading above conditions

Some questions

1- Is there any other thread which will populate jobs in Queue< T > ?

if the answer is yes than Producer / Consumer Deign Pattern can be used here i am not aware of .net 4.0 but this design can be implemented in .net 3.5.

See here for example.

温暖的光 2024-10-02 13:10:16

就我个人而言,我通常会自己推出,因为我喜欢更严格的控制。

我在媒体浏览器中使用它:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using MediaBrowser.Library.Logging;

namespace MediaBrowser.Library.Threading {

    public static class Async {

        public const string STARTUP_QUEUE = "Startup Queue";

        class ThreadPool {
            List<Action> actions = new List<Action>();
            List<Thread> threads = new List<Thread>();
            string name;
            volatile int maxThreads = 1;

            public ThreadPool(string name) {
                Debug.Assert(name != null);
                if (name == null) {
                    throw new ArgumentException("name should not be null");
                }
                this.name = name;
            }


            public void SetMaxThreads(int maxThreads) {
                Debug.Assert(maxThreads > 0);
                if (maxThreads < 1) {
                    throw new ArgumentException("maxThreads should be larger than 0");
                }

                this.maxThreads = maxThreads;
            }

            public void Queue(Action action, bool urgent) {
                Queue(action, urgent, 0);
            }

            public void Queue(Action action, bool urgent, int delay) {

                if (delay > 0) {
                    Timer t = null;
                    t = new Timer(_ =>
                    {
                        Queue(action, urgent, 0);
                        t.Dispose();
                    }, null, delay, Timeout.Infinite);
                    return;
                }

                lock (threads) {
                    // we are spinning up too many threads
                    // should be fixed 
                    if (maxThreads > threads.Count) {
                        Thread t = new Thread(new ThreadStart(ThreadProc));
                        t.IsBackground = true;
                        // dont affect the UI.
                        t.Priority = ThreadPriority.Lowest;
                        t.Name = "Worker thread for " + name;
                        t.Start();
                        threads.Add(t);
                    }
                }

                lock (actions) {
                    if (urgent) {
                        actions.Insert(0, action);
                    } else {
                        actions.Add(action);
                    }

                    Monitor.Pulse(actions);
                }
            }

            private void ThreadProc() {

                while (true) {

                    lock (threads) {
                        if (maxThreads < threads.Count) {
                            threads.Remove(Thread.CurrentThread);
                            break;
                        }
                    }

                    List<Action> copy;

                    lock (actions) {
                        while (actions.Count == 0) {
                            Monitor.Wait(actions);
                        }
                        copy = new List<Action>(actions);
                        actions.Clear();
                    }

                    foreach (var action in copy) {
                        action();
                    }
                }
            }
        }


        static Dictionary<string, ThreadPool> threadPool = new Dictionary<string, ThreadPool>();

        public static Timer Every(int milliseconds, Action action) {
            Timer timer = new Timer(_ => action(), null, 0, milliseconds);
            return timer;
        }

        public static void SetMaxThreads(string uniqueId, int threads) {
            GetThreadPool(uniqueId).SetMaxThreads(threads);
        }

        public static void Queue(string uniqueId, Action action) {
            Queue(uniqueId, action, null);
        }

        public static void Queue(string uniqueId, Action action, int delay) {
            Queue(uniqueId, action, null,false, delay);
        }

        public static void Queue(string uniqueId, Action action, Action done) {
            Queue(uniqueId, action, done, false);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent) {
            Queue(uniqueId, action, done, urgent, 0);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent, int delay) {

            Debug.Assert(uniqueId != null);
            Debug.Assert(action != null);

            Action workItem = () =>
            {
                try {
                    action();
                } catch (ThreadAbortException) { /* dont report on this, its normal */ } catch (Exception ex) {
                    Debug.Assert(false, "Async thread crashed! This must be fixed. " + ex.ToString());
                    Logger.ReportException("Async thread crashed! This must be fixed. ", ex);
                }
                if (done != null) done();
            };

            GetThreadPool(uniqueId).Queue(workItem, urgent, delay);
        }

        private static ThreadPool GetThreadPool(string uniqueId) {
            ThreadPool currentPool;
            lock (threadPool) {
                if (!threadPool.TryGetValue(uniqueId, out currentPool)) {
                    currentPool = new ThreadPool(uniqueId);
                    threadPool[uniqueId] = currentPool;
                }
            }
            return currentPool;
        }
    }

}

它有一个相当优雅的 API,我有一天想添加的唯一功能是清理空线程池。

用法:

 // Set the threads for custom thread pool 
 Async.SetMaxThreads("Queue Name", 10); 
 // Perform an action on the custom threadpool named: "Queue Name", when done call ImDone  
 Async.Queue("Queue Name", () => DoSomeThing(foo), () => ImDone(foo)); 

这有一些方便的重载,允许您对延迟的操作进行排队,另一个重载可以推送跳到队列前面的紧急作业。

Personally I roll my own usually, because I like having much tighter control.

I use this in Media Browser:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using MediaBrowser.Library.Logging;

namespace MediaBrowser.Library.Threading {

    public static class Async {

        public const string STARTUP_QUEUE = "Startup Queue";

        class ThreadPool {
            List<Action> actions = new List<Action>();
            List<Thread> threads = new List<Thread>();
            string name;
            volatile int maxThreads = 1;

            public ThreadPool(string name) {
                Debug.Assert(name != null);
                if (name == null) {
                    throw new ArgumentException("name should not be null");
                }
                this.name = name;
            }


            public void SetMaxThreads(int maxThreads) {
                Debug.Assert(maxThreads > 0);
                if (maxThreads < 1) {
                    throw new ArgumentException("maxThreads should be larger than 0");
                }

                this.maxThreads = maxThreads;
            }

            public void Queue(Action action, bool urgent) {
                Queue(action, urgent, 0);
            }

            public void Queue(Action action, bool urgent, int delay) {

                if (delay > 0) {
                    Timer t = null;
                    t = new Timer(_ =>
                    {
                        Queue(action, urgent, 0);
                        t.Dispose();
                    }, null, delay, Timeout.Infinite);
                    return;
                }

                lock (threads) {
                    // we are spinning up too many threads
                    // should be fixed 
                    if (maxThreads > threads.Count) {
                        Thread t = new Thread(new ThreadStart(ThreadProc));
                        t.IsBackground = true;
                        // dont affect the UI.
                        t.Priority = ThreadPriority.Lowest;
                        t.Name = "Worker thread for " + name;
                        t.Start();
                        threads.Add(t);
                    }
                }

                lock (actions) {
                    if (urgent) {
                        actions.Insert(0, action);
                    } else {
                        actions.Add(action);
                    }

                    Monitor.Pulse(actions);
                }
            }

            private void ThreadProc() {

                while (true) {

                    lock (threads) {
                        if (maxThreads < threads.Count) {
                            threads.Remove(Thread.CurrentThread);
                            break;
                        }
                    }

                    List<Action> copy;

                    lock (actions) {
                        while (actions.Count == 0) {
                            Monitor.Wait(actions);
                        }
                        copy = new List<Action>(actions);
                        actions.Clear();
                    }

                    foreach (var action in copy) {
                        action();
                    }
                }
            }
        }


        static Dictionary<string, ThreadPool> threadPool = new Dictionary<string, ThreadPool>();

        public static Timer Every(int milliseconds, Action action) {
            Timer timer = new Timer(_ => action(), null, 0, milliseconds);
            return timer;
        }

        public static void SetMaxThreads(string uniqueId, int threads) {
            GetThreadPool(uniqueId).SetMaxThreads(threads);
        }

        public static void Queue(string uniqueId, Action action) {
            Queue(uniqueId, action, null);
        }

        public static void Queue(string uniqueId, Action action, int delay) {
            Queue(uniqueId, action, null,false, delay);
        }

        public static void Queue(string uniqueId, Action action, Action done) {
            Queue(uniqueId, action, done, false);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent) {
            Queue(uniqueId, action, done, urgent, 0);
        }

        public static void Queue(string uniqueId, Action action, Action done, bool urgent, int delay) {

            Debug.Assert(uniqueId != null);
            Debug.Assert(action != null);

            Action workItem = () =>
            {
                try {
                    action();
                } catch (ThreadAbortException) { /* dont report on this, its normal */ } catch (Exception ex) {
                    Debug.Assert(false, "Async thread crashed! This must be fixed. " + ex.ToString());
                    Logger.ReportException("Async thread crashed! This must be fixed. ", ex);
                }
                if (done != null) done();
            };

            GetThreadPool(uniqueId).Queue(workItem, urgent, delay);
        }

        private static ThreadPool GetThreadPool(string uniqueId) {
            ThreadPool currentPool;
            lock (threadPool) {
                if (!threadPool.TryGetValue(uniqueId, out currentPool)) {
                    currentPool = new ThreadPool(uniqueId);
                    threadPool[uniqueId] = currentPool;
                }
            }
            return currentPool;
        }
    }

}

It has a fairly elegant API, only feature I would like to add one day is scavenging empty thread pools.

Usage:

 // Set the threads for custom thread pool 
 Async.SetMaxThreads("Queue Name", 10); 
 // Perform an action on the custom threadpool named: "Queue Name", when done call ImDone  
 Async.Queue("Queue Name", () => DoSomeThing(foo), () => ImDone(foo)); 

This has a few handy oveloads that allow you to queue delayed actions, and another to push in urgent jobs that skip to the front of the queue.

酷到爆炸 2024-10-02 13:10:16

这种情况BlockingCollection大声尖叫并且清晰。创建一个专用线程来监视队列并适当设置其优先级。当队列中没有项目时,BlockingCollection.Take 方法将自动阻塞。

public class Example
{
  private BlockingCollection<WorkItem> m_Queue = new BlockingCollection<WorkItem>();

  public event EventHandler<WorkItemEventArgs> WorkItemCompleted;

  public Example()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          WorkItem item = m_Queue.Take();
          // Add code to process the work item here.
          if (WorkItemCompleted != null)
          {
             WorkItemCompleted(this, new WorkItemEventArgs(item));
          }
        }
      });
    thread.IsBackground = true;
    thread.Priority = ThreadPriority.BelowNormal;
    thread.Start();
  }

  public void Add(WorkItem item)
  {
    m_Queue.Add(item);
  }

}

This situation screams BlockingCollection loud and clear. Create a dedicated thread that watches the queue with its priority set approriately. The BlockingCollection.Take method will block automatically when there are no items in the queue.

public class Example
{
  private BlockingCollection<WorkItem> m_Queue = new BlockingCollection<WorkItem>();

  public event EventHandler<WorkItemEventArgs> WorkItemCompleted;

  public Example()
  {
    var thread = new Thread(
      () =>
      {
        while (true)
        {
          WorkItem item = m_Queue.Take();
          // Add code to process the work item here.
          if (WorkItemCompleted != null)
          {
             WorkItemCompleted(this, new WorkItemEventArgs(item));
          }
        }
      });
    thread.IsBackground = true;
    thread.Priority = ThreadPriority.BelowNormal;
    thread.Start();
  }

  public void Add(WorkItem item)
  {
    m_Queue.Add(item);
  }

}
少跟Wǒ拽 2024-10-02 13:10:16

线程池听起来就是这样。实际上,你可以通过设置进程优先级来改变.NET自带线程池的优先级。将进程优先级降低一个档次,并将您的 UI 提高一个档次,您应该拥有正常优先级的 UI 和较低优先级的线程池。

A thread pool sounds like just the thing. Actually, you can change the priority of .NET's own thread pool by setting the process priority. Bump the process priority down a notch and your UI up a notch and you should have a UI at normal priority and thread pool at lower priority.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文