让多个线程执行工作并等待所有线程完成的最佳方法是什么?
我正在编写一个简单的应用程序(为我的妻子也一样:-P),它对潜在的大量图像进行一些图像处理(调整大小、时间戳等)。所以我正在编写一个可以同步和异步执行此操作的库。我决定使用基于事件的异步模式。使用此模式时,您需要在工作完成时引发一个事件。这就是我在知道它何时完成时遇到问题的地方。所以基本上,在我的 DownsizeAsync 方法(缩小图像尺寸的异步方法)中,我正在做这样的事情:
public void DownsizeAsync(string[] files, string destination)
{
foreach (var name in files)
{
string temp = name; //countering the closure issue
ThreadPool.QueueUserWorkItem(f =>
{
string newFileName = this.DownsizeImage(temp, destination);
this.OnImageResized(newFileName);
});
}
}
现在棘手的部分是知道它们何时全部完成。
这是我考虑过的:使用 ManualResetEvents,如下所示: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx 但我遇到的问题是你只能等待 64 个或更少的事件。我可能还有更多更多的图像。
第二个选项:有一个计数器来计算已完成的图像,并在计数达到总数时引发事件:
public void DownsizeAsync(string[] files, string destination)
{
foreach (var name in files)
{
string temp = name; //countering the closure issue
ThreadPool.QueueUserWorkItem(f =>
{
string newFileName = this.DownsizeImage(temp, destination);
this.OnImageResized(newFileName);
total++;
if (total == files.Length)
{
this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
}
});
}
}
private volatile int total = 0;
现在这感觉“hacky”,我不完全确定这是否是线程安全的。
所以,我的问题是,最好的方法是什么?还有另一种方法可以同步所有线程吗?我不应该使用线程池吗?谢谢!!
更新根据评论中的反馈和一些答案,我决定采用这种方法:
首先,我创建了一个扩展方法,将可枚举的批处理为“批次”:
public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
{
for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
{
yield return s.Take(batchCount);
}
}
基本上,如果您执行某些操作像这样:
foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
{
foreach (int i in batch)
{
Console.Write("{0} ", i);
}
Console.WriteLine();
}
你得到这个输出:
1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95
这个想法是(正如评论中的某人指出的那样)不需要为每个图像创建一个单独的线程。因此,我会将图像分为 [machine.cores * 2] 个批次。然后,我将使用第二种方法,即保持计数器继续运行,当计数器达到我预期的总数时,我就会知道我已经完成了。
我现在确信它实际上是线程安全的原因是因为我已将总变量标记为易失性,根据 MSDN:
通常使用 volatile 修饰符 对于通过以下方式访问的字段 多线程而不使用 lock 语句来序列化访问。 使用 volatile 修饰符可确保 那个线程检索最多 另一个人写入的最新值 线程
意味着我应该清楚(如果没有,请告诉我!!)
所以这是我要使用的代码:
public void DownsizeAsync(string[] files, string destination)
{
int cores = Environment.ProcessorCount * 2;
int batchAmount = files.Length / cores;
foreach (var batch in files.GetBatches(batchAmount))
{
var temp = batch.ToList(); //counter closure issue
ThreadPool.QueueUserWorkItem(b =>
{
foreach (var item in temp)
{
string newFileName = this.DownsizeImage(item, destination);
this.OnImageResized(newFileName);
total++;
if (total == files.Length)
{
this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
}
}
});
}
}
我愿意接受反馈,因为我绝不是多线程方面的专家,所以如果有人看到任何问题,或者有更好的想法,请告诉我。 (是的,这只是一个自制的应用程序,但我对如何利用在这里获得的知识来改进我们在工作中使用的搜索/索引服务有一些想法。)现在我将保持这个问题开放,直到我感觉我正在使用正确的方法。感谢大家的帮助。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(8)
最简单的事情是创建新线程,然后对每个线程调用 Thread.Join。您可以使用信号量或类似的东西 - 但创建新线程可能更容易。
在 .NET 4.0 中,您可以使用并行扩展来轻松地完成任务。
作为使用线程池的另一种替代方案,您可以创建一个委托并对其调用
BeginInvoke
,以返回IAsyncResult
- 然后您可以获得IAsyncResult
microsoft.com/en-us/library/system.threading.waithandle.aspx" rel="nofollow noreferrer">WaitHandle
通过AsyncWaitHandle
属性,并调用WaitHandle.WaitAll
。编辑:正如评论中所指出的,在某些实现上,您只能一次调用最多 64 个句柄的
WaitAll
。替代方案可以依次对每个调用调用WaitOne
,或者批量调用WaitAll
。只要您从不会阻塞线程池的线程中执行此操作,这并不重要。另请注意,您无法从 STA 线程调用WaitAll
。The simplest thing would be to create new threads, and then call
Thread.Join
on each of them. You could use a semaphore or something like it - but it's probably easier to just create new threads.In .NET 4.0 you could use Parallel Extensions to do this quite easily with tasks.
As another alternative which would use the threadpool, you could create a delegate and call
BeginInvoke
on it, to return anIAsyncResult
- you can then get theWaitHandle
for each result via theAsyncWaitHandle
property, and callWaitHandle.WaitAll
.EDIT: As pointed out in comments, you can only call
WaitAll
with up to 64 handles at a time on some implementations. Alternatives could be callingWaitOne
on each of them in turn, or callingWaitAll
with batches. It won't really matter, so long as you're doing it from a thread which isn't going to block the threadpool. Also note that you can't callWaitAll
from an STA thread.您仍然想使用 ThreadPool,因为它将管理同时运行的线程数。我最近遇到了一个类似的问题,并像这样解决了它:
IDispatcher 和 IJob 看起来像这样:
然后使用装饰器来分块 ThreadPool 的使用:
IDispatcher 抽象非常适合交换线程技术。我有另一个实现,即 SingleThreadedDispatcher,您可以像 Jon Skeet 建议的那样制作 ThreadStart 版本。然后就可以轻松运行每一个并查看获得的性能。当调试代码或不想终止机器上的处理器时,SingleThreadedDispatcher 非常有用。
编辑:我忘记添加 ThreadPoolWorker 的代码:
You still want to use the ThreadPool because it will manage the number of threads it runs simultaneously. I ran into a similar issue recently and solved it like this:
The IDispatcher and IJob look like this:
And then used a decorator to chunk the use of ThreadPool:
The IDispatcher abstraction works pretty well for swapping out your threading technique. I have another implementation that is a SingleThreadedDispatcher and you could make a ThreadStart version like Jon Skeet suggested. Then it's easy to run each one and see what kind of performance you get. The SingleThreadedDispatcher is good when debugging your code or when you don't want to kill the processor on your box.
Edit: I forgot to add the code for ThreadPoolWorker:
最简单有效的解决方案是使用计数器并使其线程安全。这将消耗更少的内存,并且可以扩展到更多的线程数
这是一个示例
The simplest and efficient solution would be to use the counters and make them thread safe. This would consume less memory and can scale up to higher number of threads
Here is a sample
我已经使用 SmartThreadPool 成功地解决了这个问题。还有一个关于反汇编的 Codeplex 站点。
SmartThreadPool 还可以帮助解决其他问题,例如某些线程无法同时运行,而其他线程可以同时运行。
I've used SmartThreadPool with much succes to cope with this problem. There is also a Codeplex site about de assembly.
SmartThreadPool can help with other problems as well like some threads cannot run at te same time while others can.
我使用静态实用程序方法来检查所有单独的等待句柄。
然后在我的主线程中,我创建这些等待句柄的列表,并且对于放入线程池队列中的每个委托,我将等待句柄添加到列表中。 ..
I use a static utility method to examine all the individual wait handles..
Then in my main thread, I create a List of these wait handles, and for each delegate I put in my ThreadPool Queue, I add the wait handle to the List...
.Net 4.0 使多线程变得更加容易(尽管您仍然可以用副作用来射击自己)。
.Net 4.0 makes multi-threading even easier (although you can still shoot yourself with side effects).
另一种选择是使用管道。
您将所有要完成的工作发布到管道中,然后从每个线程的管道中读取数据。当管道为空时,你就完成了,线程自行结束,每个人都很高兴(当然,请确保你首先产生所有工作,然后消耗它)
Another option would be to use a Pipe.
You post all the work to be done to the pipe and then read data from the pipe from each thread. When the pipe is empty, you're done, threads ends themselves and everybody is happy (of course make sure you first produce all the work, then consume it)
我建议将未触及的图像放入队列中,当您从队列中读取时,启动一个线程并将其 System.Threading.Thread.ManagedThreadId 属性与文件名一起插入字典中。这样,您的 UI 就可以列出待处理文件和活动文件。
当每个线程完成时,它会调用回调例程,传回其 ManagedThreadId。此回调(作为线程的委托传递)从字典中删除线程的 id,从队列中启动另一个线程,并更新 UI。
当队列和字典都为空时,就完成了。
稍微复杂一些,但通过这种方式,您可以获得响应式 UI,可以轻松控制活动线程的数量,并且可以看到正在运行的内容。收集统计数据。喜欢 WPF 并为每个文件设置进度条。她不禁被感动。
I suggest putting the untouched images in a queue and as you read from the queue launch a thread and insert its
System.Threading.Thread.ManagedThreadId
property into a dictionary along with the file name. This way your UI can list both pending and active files.When each thread completes it invokes a callback routine, passing back its ManagedThreadId. This callback (passed as a delegate to the thread) removes the thread's id from the dictionary, launches another thread from the queue, and updates the UI.
When both the queue and the dictionary are empty, you're done.
Slightly more complicated but this way you get a responsive UI, you can easily control the number of active threads, and you can see what's in flight. Collect statistics. Get fancy with WPF and put up progress bars for each file. She can't help but be impressed.