我的自定义线程池有什么问题?

发布于 2024-07-11 18:22:23 字数 4466 浏览 3 评论 0原文

我创建了一个自定义线程池实用程序,但似乎有一个我找不到的问题。

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
    private readonly Queue queue = Queue.Synchronized(new Queue());
    private readonly AutoResetEvent res = new AutoResetEvent(true);
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
    private readonly Semaphore sem = new Semaphore(1, 4);
    private readonly Thread thread;
    private Action<T> DoWork;
    private int Num_Of_Threads;

    private QueueManager()
    {
        Num_Of_Threads = 0;
        maxThread = 5;
        thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
        thread.Start();

        //   log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
    }

    public int maxThread { get; set; }

    public static FileUploadQueueManager<T> Instance
    {
        get { return Nested.instance; }
    }

    /// <summary>
    /// Executes multythreaded operation under items
    /// </summary>
    /// <param name="list">List of items to proceed</param>
    /// <param name="action">Action under item</param>
    /// <param name="MaxThreads">Maximum threads</param>
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        maxThread = MaxThreads;
        DoWork = action;
        foreach (T item in list)
        {
            Add(item);
        }
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
    {
        ExecuteNoThread(list, action, 0);
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        foreach (T wallpaper in list)
        {
            action(wallpaper);
        }
    }
    /// <summary>
    /// Default 10 threads
    /// </summary>
    /// <param name="list"></param>
    /// <param name="action"></param>
    public void Execute(IEnumerable<T> list, Action<T> action)
    {
        Execute(list, action, 10);
    }

    private void Add(T item)
    {
        lock (queue)
        {
            queue.Enqueue(item);
        }
        res.Set();
    }

    private void Worker()
    {
        while (true)
        {
            if (queue.Count == 0)
            {
                res.WaitOne();
            }

            if (Num_Of_Threads < maxThread)
            {
                var t = new Thread(Proceed);
                t.Start();
            }
            else
            {
                res_thr.WaitOne();
            }
        }
    }

    private void Proceed()
    {
        Interlocked.Increment(ref Num_Of_Threads);
        if (queue.Count > 0)
        {
            var item = (T) queue.Dequeue();

            sem.WaitOne();
            ProceedItem(item);
            sem.Release();
        }
        res_thr.Set();
        Interlocked.Decrement(ref Num_Of_Threads);
    }

    private void ProceedItem(T activity)
    {
        if (DoWork != null)
            DoWork(activity);

        lock (Instance)
        {
            Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                          thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                          Num_Of_Threads);
        }
    }

    #region Nested type: Nested

    protected class Nested
    {
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
    }

    #endregion

}

问题

就在这里:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                      thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                      Num_Of_Threads);

标题中总是有一个线程 id。 并且程序似乎在一个线程中工作。

示例用法:

        var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
        QueueManager<int>.Instance.Execute(i_list,
          i =>
          {
              Console.WriteLine("Some action under element number {0}", i);

          }, 5);

PS:这很混乱,但我仍在努力。

I've created a custom thread pool utility, but there seems to be a problem that I cannot find.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
    private readonly Queue queue = Queue.Synchronized(new Queue());
    private readonly AutoResetEvent res = new AutoResetEvent(true);
    private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
    private readonly Semaphore sem = new Semaphore(1, 4);
    private readonly Thread thread;
    private Action<T> DoWork;
    private int Num_Of_Threads;

    private QueueManager()
    {
        Num_Of_Threads = 0;
        maxThread = 5;
        thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
        thread.Start();

        //   log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
    }

    public int maxThread { get; set; }

    public static FileUploadQueueManager<T> Instance
    {
        get { return Nested.instance; }
    }

    /// <summary>
    /// Executes multythreaded operation under items
    /// </summary>
    /// <param name="list">List of items to proceed</param>
    /// <param name="action">Action under item</param>
    /// <param name="MaxThreads">Maximum threads</param>
    public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        maxThread = MaxThreads;
        DoWork = action;
        foreach (T item in list)
        {
            Add(item);
        }
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
    {
        ExecuteNoThread(list, action, 0);
    }
    public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
    {
        foreach (T wallpaper in list)
        {
            action(wallpaper);
        }
    }
    /// <summary>
    /// Default 10 threads
    /// </summary>
    /// <param name="list"></param>
    /// <param name="action"></param>
    public void Execute(IEnumerable<T> list, Action<T> action)
    {
        Execute(list, action, 10);
    }

    private void Add(T item)
    {
        lock (queue)
        {
            queue.Enqueue(item);
        }
        res.Set();
    }

    private void Worker()
    {
        while (true)
        {
            if (queue.Count == 0)
            {
                res.WaitOne();
            }

            if (Num_Of_Threads < maxThread)
            {
                var t = new Thread(Proceed);
                t.Start();
            }
            else
            {
                res_thr.WaitOne();
            }
        }
    }

    private void Proceed()
    {
        Interlocked.Increment(ref Num_Of_Threads);
        if (queue.Count > 0)
        {
            var item = (T) queue.Dequeue();

            sem.WaitOne();
            ProceedItem(item);
            sem.Release();
        }
        res_thr.Set();
        Interlocked.Decrement(ref Num_Of_Threads);
    }

    private void ProceedItem(T activity)
    {
        if (DoWork != null)
            DoWork(activity);

        lock (Instance)
        {
            Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                          thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                          Num_Of_Threads);
        }
    }

    #region Nested type: Nested

    protected class Nested
    {
        // Explicit static constructor to tell C# compiler
        // not to mark type as beforefieldinit
        internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
    }

    #endregion

}

}

Problem is here:

Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
                                      thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
                                      Num_Of_Threads);

There is always ONE thread id in title. And program seems to be working in one thread.

Sample usage:

        var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
        QueueManager<int>.Instance.Execute(i_list,
          i =>
          {
              Console.WriteLine("Some action under element number {0}", i);

          }, 5);

P.S.: it's pretty messy, but I'm still working on it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

梦晓ヶ微光ヅ倾城 2024-07-18 18:22:23

我查看了您的代码,这是我看到的几个问题。

  1. 即使它是同步队列,您也可以锁定队列对象。 这是不必要的
  2. 您不一致地锁定了队列对象。 它应该为每次访问锁定或不锁定,具体取决于同步行为。
  3. Proceed 方法不是线程安全的。 这两行就是问题

     if (queue.Count > 0) { 
            var item = (T)queue.Dequeue(); 
          ... 
          } 
       
    
      

    使用同步队列只能保证各个访问的安全。 因此 .Count 和 .Dequeue 方法都不会扰乱队列的内部结构。 然而想象一下这样的场景:两个线程同时运行这些代码行,队列的计数为 1

    • 线程1:if (...) -> 正确
    • 线程2:if (...) -> 正确
    • 线程1:出队-> 成功
    • 线程2:出队-> 由于队列为空而失败
  4. Worker 和 Proceed 之间存在可能导致死锁的竞争条件。 应切换以下两行代码。

    代码:

     res_thr.Set() 
          Interlocked.Decrement(ref Num_Of_Threads);

    第一行将解锁 Worker 方法。 如果它运行得足够快,它将返回查看,请注意 Num_Of_Threads < maxThreads 并立即返回到 res_thr.WaitOne()。 如果当前没有其他线程正在运行,那么这将导致代码中出现死锁。 当最大线程数较少(例如 1)时,很容易实现这一点。 反转这两行代码应该可以解决问题。

  5. maxThread count 属性似乎没有超过 4 个有用。 sem 对象被初始化为仅接受 4 个最大并发条目。 所有实际执行某项的代码都必须经过此信号量。 因此,无论 maxThread 设置多高,您都有效地将并发项目的最大数量限制为 4。

I looked through your code and here are a couple of issues I saw.

  1. You lock the queue object even though it is synchronized queue. This is unnecessary
  2. You inconsistently lock the queue object. It should either be locked for every access or not locked and depending on the Synchronized behavior.
  3. The Proceed method is not thread safe. These two lines are the issue

        if (queue.Count > 0) {
          var item = (T)queue.Dequeue();
        ...
        }
    

    Using a synchronized queue only guarantees that individual accesses are safe. So both the .Count and the .Dequeue method won't mess with te internal structure of the queue. However imagine the scenario where two threads run these lines of code at the same time with a queue of count 1

    • Thread1: if (...) -> true
    • Thread2: if (...) -> true
    • Thread1: dequeue -> sucess
    • Thread2: dequeue -> fails because the queue is empty
  4. There is a race condition between Worker and Proceed that can lead to deadlock. The following two lines of code should be switched.

    Code:

        res_thr.Set()
        Interlocked.Decrement(ref Num_Of_Threads);

    The first line will unblock the Worker method. If it runs quickly enough it will go back through the look, notice that Num_Of_Threads < maxThreads and go right back into res_thr.WaitOne(). If no other threads are currently running then this will lead to a deadlock in your code. This is very easy to hit with a low number of maximum threads (say 1). Inverting these two lines of code should fix the issue.

  5. The maxThread count property does not seem to be useful beyond 4. The sem object is initialized to accept only 4 maximum concurrent entries. All code that actually executes an item must go through this semaphore. So you've effectively limited the maximum number of concurrent items to 4 regardless of how high maxThread is set.
北笙凉宸 2024-07-18 18:22:23

编写健壮的线程代码并非易事。 您可以查看许多线程池以供参考,但还要注意并行扩展(可作为 CTP,或更高版本的 .NET 4.0)包括许多开箱即用的附加线程构造(在 TPL/ CCR)。 例如,Parallel.For / Parallel.ForEach,它处理工作窃取,并有效地处理可用内核。

有关预滚动线程池的示例,请参阅 Jon Skeet 的 CustomThreadPool

Writing robust threaded code is not trivial. There are numerous thread-pools around that you might look at for reference, but also note that Parallel Extensions (available as CTP, or later in .NET 4.0) includes a lot of additional threading constructs out-of-the-box (in the TPL/CCR). For example, Parallel.For / Parallel.ForEach, which deal with work-stealing, and handling the available cores effectively.

For an example of a pre-rolled thread-pool, see Jon Skeet's CustomThreadPool here.

ゃ懵逼小萝莉 2024-07-18 18:22:23

我认为你可以让事情变得相当简单。

这是我使用的线程池的修改形式(我没有测试修改):

唯一的同步。 您需要的原语是一个监视器,锁定在线程池上。 您不需要信号量或重置事件。

internal class ThreadPool
{
    private readonly Thread[] m_threads;
    private readonly Queue<Action> m_queue;
    private bool m_shutdown;
    private object m_lockObj;


    public ThreadPool(int numberOfThreads)
    {
        Util.Assume(numberOfThreads > 0, "Invalid thread count!");
        m_queue = new Queue<Action>();
        m_threads = new Thread[numberOfThreads];
        m_lockObj = new object();

        lock (m_lockObj)
        {
            for (int i = 0; i < numberOfWriteThreads; ++i)
            {
                m_threads[i] = new Thread(ThreadLoop);
                m_threads[i].Start();
            }
        }

    }

    public void Shutdown()
    {
        lock (m_lockObj)
        {
            m_shutdown = true;
            Monitor.PulseAll(m_lockObj);

            if (OnShuttingDown != null)
            {
                OnShuttingDown();
            }
        }
        foreach (var thread in m_threads)
        {
            thread.Join();
        }
    }
    public void Enqueue(Action a)
    {
        lock (m_lockObj)
        {
            m_queue.Enqueue(a);
            Monitor.Pulse(m_lockObj);
        }
    }

    private void ThreadLoop()
    {
        Monitor.Enter(m_lockObj);

        while (!m_shutdown)
        {
            if (m_queue.Count == 0)
            {
                Monitor.Wait(m_lockObj);
            }
            else
            {
                var a = m_queue.Dequeue();
                Monitor.Pulse(m_lockObj);
                Monitor.Exit(m_lockObj);
                try
                {
                    a();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("An unhandled exception occured!\n:{0}", ex.Message, null);
                }
                Monitor.Enter(m_lockObj);
            }
        }

        Monitor.Exit(m_lockObj);
    }
}

I think you can simply things considerably.

Here is a modified form (I didn't test the modifications) of the thread pool I use:

The only sync. primitive you need is a Monitor, locked on the thread pool. You don't need a semaphore, or the reset events.

internal class ThreadPool
{
    private readonly Thread[] m_threads;
    private readonly Queue<Action> m_queue;
    private bool m_shutdown;
    private object m_lockObj;


    public ThreadPool(int numberOfThreads)
    {
        Util.Assume(numberOfThreads > 0, "Invalid thread count!");
        m_queue = new Queue<Action>();
        m_threads = new Thread[numberOfThreads];
        m_lockObj = new object();

        lock (m_lockObj)
        {
            for (int i = 0; i < numberOfWriteThreads; ++i)
            {
                m_threads[i] = new Thread(ThreadLoop);
                m_threads[i].Start();
            }
        }

    }

    public void Shutdown()
    {
        lock (m_lockObj)
        {
            m_shutdown = true;
            Monitor.PulseAll(m_lockObj);

            if (OnShuttingDown != null)
            {
                OnShuttingDown();
            }
        }
        foreach (var thread in m_threads)
        {
            thread.Join();
        }
    }
    public void Enqueue(Action a)
    {
        lock (m_lockObj)
        {
            m_queue.Enqueue(a);
            Monitor.Pulse(m_lockObj);
        }
    }

    private void ThreadLoop()
    {
        Monitor.Enter(m_lockObj);

        while (!m_shutdown)
        {
            if (m_queue.Count == 0)
            {
                Monitor.Wait(m_lockObj);
            }
            else
            {
                var a = m_queue.Dequeue();
                Monitor.Pulse(m_lockObj);
                Monitor.Exit(m_lockObj);
                try
                {
                    a();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("An unhandled exception occured!\n:{0}", ex.Message, null);
                }
                Monitor.Enter(m_lockObj);
            }
        }

        Monitor.Exit(m_lockObj);
    }
}
小霸王臭丫头 2024-07-18 18:22:23

您可能应该使用内置线程池。 运行代码时,我注意到您启动了一堆线程,但由于队列计数小于 1,您只需退出,这种情况就会继续,直到队列实际填充,然后您的下一个线程处理所有内容。 这是一个非常昂贵的过程。 仅当您有事情要做时才应该启动线程。

You should probally use the built in thread pool. When running your code I noticed that your spining up a bunch of threads but since the queue count is <1 you just exit, this continues until the queue is actually populated then your next thread processes everything. This is a very expensive process. You should only spin up threads if you have something to do.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文