我的自定义线程池有什么问题?
我创建了一个自定义线程池实用程序,但似乎有一个我找不到的问题。
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
private readonly Queue queue = Queue.Synchronized(new Queue());
private readonly AutoResetEvent res = new AutoResetEvent(true);
private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
private readonly Semaphore sem = new Semaphore(1, 4);
private readonly Thread thread;
private Action<T> DoWork;
private int Num_Of_Threads;
private QueueManager()
{
Num_Of_Threads = 0;
maxThread = 5;
thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
thread.Start();
// log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
}
public int maxThread { get; set; }
public static FileUploadQueueManager<T> Instance
{
get { return Nested.instance; }
}
/// <summary>
/// Executes multythreaded operation under items
/// </summary>
/// <param name="list">List of items to proceed</param>
/// <param name="action">Action under item</param>
/// <param name="MaxThreads">Maximum threads</param>
public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
maxThread = MaxThreads;
DoWork = action;
foreach (T item in list)
{
Add(item);
}
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
{
ExecuteNoThread(list, action, 0);
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
foreach (T wallpaper in list)
{
action(wallpaper);
}
}
/// <summary>
/// Default 10 threads
/// </summary>
/// <param name="list"></param>
/// <param name="action"></param>
public void Execute(IEnumerable<T> list, Action<T> action)
{
Execute(list, action, 10);
}
private void Add(T item)
{
lock (queue)
{
queue.Enqueue(item);
}
res.Set();
}
private void Worker()
{
while (true)
{
if (queue.Count == 0)
{
res.WaitOne();
}
if (Num_Of_Threads < maxThread)
{
var t = new Thread(Proceed);
t.Start();
}
else
{
res_thr.WaitOne();
}
}
}
private void Proceed()
{
Interlocked.Increment(ref Num_Of_Threads);
if (queue.Count > 0)
{
var item = (T) queue.Dequeue();
sem.WaitOne();
ProceedItem(item);
sem.Release();
}
res_thr.Set();
Interlocked.Decrement(ref Num_Of_Threads);
}
private void ProceedItem(T activity)
{
if (DoWork != null)
DoWork(activity);
lock (Instance)
{
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
}
}
#region Nested type: Nested
protected class Nested
{
// Explicit static constructor to tell C# compiler
// not to mark type as beforefieldinit
internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
}
#endregion
}
问题
就在这里:
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
标题中总是有一个线程 id。 并且程序似乎在一个线程中工作。
示例用法:
var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
QueueManager<int>.Instance.Execute(i_list,
i =>
{
Console.WriteLine("Some action under element number {0}", i);
}, 5);
PS:这很混乱,但我仍在努力。
I've created a custom thread pool utility, but there seems to be a problem that I cannot find.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
namespace iWallpaper.S3Uploader
{
public class QueueManager<T>
{
private readonly Queue queue = Queue.Synchronized(new Queue());
private readonly AutoResetEvent res = new AutoResetEvent(true);
private readonly AutoResetEvent res_thr = new AutoResetEvent(true);
private readonly Semaphore sem = new Semaphore(1, 4);
private readonly Thread thread;
private Action<T> DoWork;
private int Num_Of_Threads;
private QueueManager()
{
Num_Of_Threads = 0;
maxThread = 5;
thread = new Thread(Worker) {Name = "S3Uploader EventRegisterer"};
thread.Start();
// log.Info(String.Format("{0} [QUEUE] FileUploadQueueManager created", DateTime.Now.ToLongTimeString()));
}
public int maxThread { get; set; }
public static FileUploadQueueManager<T> Instance
{
get { return Nested.instance; }
}
/// <summary>
/// Executes multythreaded operation under items
/// </summary>
/// <param name="list">List of items to proceed</param>
/// <param name="action">Action under item</param>
/// <param name="MaxThreads">Maximum threads</param>
public void Execute(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
maxThread = MaxThreads;
DoWork = action;
foreach (T item in list)
{
Add(item);
}
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action)
{
ExecuteNoThread(list, action, 0);
}
public void ExecuteNoThread(IEnumerable<T> list, Action<T> action, int MaxThreads)
{
foreach (T wallpaper in list)
{
action(wallpaper);
}
}
/// <summary>
/// Default 10 threads
/// </summary>
/// <param name="list"></param>
/// <param name="action"></param>
public void Execute(IEnumerable<T> list, Action<T> action)
{
Execute(list, action, 10);
}
private void Add(T item)
{
lock (queue)
{
queue.Enqueue(item);
}
res.Set();
}
private void Worker()
{
while (true)
{
if (queue.Count == 0)
{
res.WaitOne();
}
if (Num_Of_Threads < maxThread)
{
var t = new Thread(Proceed);
t.Start();
}
else
{
res_thr.WaitOne();
}
}
}
private void Proceed()
{
Interlocked.Increment(ref Num_Of_Threads);
if (queue.Count > 0)
{
var item = (T) queue.Dequeue();
sem.WaitOne();
ProceedItem(item);
sem.Release();
}
res_thr.Set();
Interlocked.Decrement(ref Num_Of_Threads);
}
private void ProceedItem(T activity)
{
if (DoWork != null)
DoWork(activity);
lock (Instance)
{
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
}
}
#region Nested type: Nested
protected class Nested
{
// Explicit static constructor to tell C# compiler
// not to mark type as beforefieldinit
internal static readonly QueueManager<T> instance = new FileUploadQueueManager<T>();
}
#endregion
}
}
Problem is here:
Console.Title = string.Format("ThrId:{0}/{4}, {1}, Activity({2} left):{3}",
thread.ManagedThreadId, DateTime.Now, queue.Count, activity,
Num_Of_Threads);
There is always ONE thread id in title. And program seems to be working in one thread.
Sample usage:
var i_list = new int[] {1, 2, 4, 5, 6, 7, 8, 6};
QueueManager<int>.Instance.Execute(i_list,
i =>
{
Console.WriteLine("Some action under element number {0}", i);
}, 5);
P.S.: it's pretty messy, but I'm still working on it.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
我查看了您的代码,这是我看到的几个问题。
Proceed 方法不是线程安全的。 这两行就是问题
Worker 和 Proceed 之间存在可能导致死锁的竞争条件。 应切换以下两行代码。
代码:
第一行将解锁 Worker 方法。 如果它运行得足够快,它将返回查看,请注意 Num_Of_Threads < maxThreads 并立即返回到 res_thr.WaitOne()。 如果当前没有其他线程正在运行,那么这将导致代码中出现死锁。 当最大线程数较少(例如 1)时,很容易实现这一点。 反转这两行代码应该可以解决问题。
I looked through your code and here are a couple of issues I saw.
The Proceed method is not thread safe. These two lines are the issue
Using a synchronized queue only guarantees that individual accesses are safe. So both the .Count and the .Dequeue method won't mess with te internal structure of the queue. However imagine the scenario where two threads run these lines of code at the same time with a queue of count 1
There is a race condition between Worker and Proceed that can lead to deadlock. The following two lines of code should be switched.
Code:
The first line will unblock the Worker method. If it runs quickly enough it will go back through the look, notice that Num_Of_Threads < maxThreads and go right back into res_thr.WaitOne(). If no other threads are currently running then this will lead to a deadlock in your code. This is very easy to hit with a low number of maximum threads (say 1). Inverting these two lines of code should fix the issue.
编写健壮的线程代码并非易事。 您可以查看许多线程池以供参考,但还要注意并行扩展(可作为 CTP,或更高版本的 .NET 4.0)包括许多开箱即用的附加线程构造(在 TPL/ CCR)。 例如,
Parallel.For
/Parallel.ForEach
,它处理工作窃取,并有效地处理可用内核。有关预滚动线程池的示例,请参阅 Jon Skeet 的
CustomThreadPool
Writing robust threaded code is not trivial. There are numerous thread-pools around that you might look at for reference, but also note that Parallel Extensions (available as CTP, or later in .NET 4.0) includes a lot of additional threading constructs out-of-the-box (in the TPL/CCR). For example,
Parallel.For
/Parallel.ForEach
, which deal with work-stealing, and handling the available cores effectively.For an example of a pre-rolled thread-pool, see Jon Skeet's
CustomThreadPool
here.我认为你可以让事情变得相当简单。
这是我使用的线程池的修改形式(我没有测试修改):
唯一的同步。 您需要的原语是一个监视器,锁定在线程池上。 您不需要信号量或重置事件。
I think you can simply things considerably.
Here is a modified form (I didn't test the modifications) of the thread pool I use:
The only sync. primitive you need is a Monitor, locked on the thread pool. You don't need a semaphore, or the reset events.
您可能应该使用内置线程池。 运行代码时,我注意到您启动了一堆线程,但由于队列计数小于 1,您只需退出,这种情况就会继续,直到队列实际填充,然后您的下一个线程处理所有内容。 这是一个非常昂贵的过程。 仅当您有事情要做时才应该启动线程。
You should probally use the built in thread pool. When running your code I noticed that your spining up a bunch of threads but since the queue count is <1 you just exit, this continues until the queue is actually populated then your next thread processes everything. This is a very expensive process. You should only spin up threads if you have something to do.