让多个线程执行工作并等待所有线程完成的最佳方法是什么?

发布于 2024-08-15 00:39:09 字数 4258 浏览 5 评论 0 原文

我正在编写一个简单的应用程序(为我的妻子也一样:-P),它对潜在的大量图像进行一些图像处理(调整大小、时间戳等)。所以我正在编写一个可以同步和异步执行此操作的库。我决定使用基于事件的异步模式。使用此模式时,您需要在工作完成时引发一个事件。这就是我在知道它何时完成时遇到问题的地方。所以基本上,在我的 DownsizeAsync 方法(缩小图像尺寸的异步方法)中,我正在做这样的事情:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

现在棘手的部分是知道它们何时全部完成。

这是我考虑过的:使用 ManualResetEvents,如下所示: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx 但我遇到的问题是你只能等待 64 个或更少的事件。我可能还有更多更多的图像。

第二个选项:有一个计数器来计算已完成的图像,并在计数达到总数时引发事件:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

现在这感觉“hacky”,我不完全确定这是否是线程安全的。

所以,我的问题是,最好的方法是什么?还有另一种方法可以同步所有线程吗?我不应该使用线程池吗?谢谢!!

更新根据评论中的反馈和一些答案,我决定采用这种方法:

首先,我创建了一个扩展方法,将可枚举的批处理为“批次”:

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

基本上,如果您执行某些操作像这样:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

你得到这个输出:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

这个想法是(正如评论中的某人指出的那样)不需要为每个图像创建一个单独的线程。因此,我会将图像分为 [machine.cores * 2] 个批次。然后,我将使用第二种方法,即保持计数器继续运行,当计数器达到我预期的总数时,我就会知道我已经完成了。

我现在确信它实际上是线程安全的原因是因为我已将总变量标记为易失性,根据 MSDN

通常使用 volatile 修饰符 对于通过以下方式访问的字段 多线程而不使用 lock 语句来序列化访问。 使用 volatile 修饰符可确保 那个线程检索最多 另一个人写入的最新值 线程

意味着我应该清楚(如果没有,请告诉我!!)

所以这是我要使用的代码:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

我愿意接受反馈,因为我绝不是多线程方面的专家,所以如果有人看到任何问题,或者有更好的想法,请告诉我。 (是的,这只是一个自制的应用程序,但我对如何利用在这里获得的知识来改进我们在工作中使用的搜索/索引服务有一些想法。)现在我将保持这个问题开放,直到我感觉我正在使用正确的方法。感谢大家的帮助。

I'm writing a simple app (for my wife no less :-P ) that does some image manipulation (resizing, timestamping etc) for a potentially large batch of images. So I'm writing a library that can do this both synchronously and asynchronously. I decided to use the Event-based Asynchronous Pattern. When using this pattern, you need to raise an event when the work has been completed. This is where I'm having problems knowing when it's done. So basically, in my DownsizeAsync method (async method for downsizing images) I'm doing something like this:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

The tricky part now is knowing when they are all complete.

Here's what I've considered: Using ManualResetEvents like here: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx But the problem I came across is that you can only wait for 64 or less events. I may have many many more images.

Second option: Have a counter that counts the images that have been done, and raise the event when the count reaches the total:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

Now this feels "hacky" and I'm not entirely sure if that's thread safe.

So, my question is, what's the best way of doing this? Is there another way to synchronize all threads? Should I not be using a ThreadPool? Thanks!!

UPDATE Based on feedback in the comments and from a few answers I've decided to take this approach:

First, I created an extension method that batches an enumerable into "batches":

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

Basically, if you do something like this:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

You get this output:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

The idea being that (as someone in the comments pointed out) there's no need to create a separate thread for each image. Therefore, I'll batch the images into [machine.cores * 2] number of batches. Then, I'll use my second approach which is simply to keep a counter going and when the counter reaches the total I'm expecting, I'll know I'm done.

The reason I'm convinced now that it is in fact thread safe, is because I've marked the total variable as volatile which according to MSDN:

The volatile modifier is usually used
for a field that is accessed by
multiple threads without using the
lock statement to serialize access.
Using the volatile modifier ensures
that one thread retrieves the most
up-to-date value written by another
thread

means I should be in the clear (if not, please let me know!!)

So here's the code I'm going with:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

I'm open to feedback as I'm in no way an expert on multithreading, so if anyone sees any issue with this, or has a better idea, please let me know. (Yes, this is just a home made app, but I have some ideas on how I can use the knowledge I gain here to improve our Search / Index service we use at work.) For now I'll keep this question open till I feel like I'm using the right approach. Thanks everyone for your help.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

记忆で 2024-08-22 00:39:09

最简单的事情是创建新线程,然后对每个线程调用 Thread.Join。您可以使用信号量或类似的东西 - 但创建新线程可能更容易。

在 .NET 4.0 中,您可以使用并行扩展来轻松地完成任务。

作为使用线程池的另一种替代方案,您可以创建一个委托并对其调用 BeginInvoke,以返回 IAsyncResult - 然后您可以获得 IAsyncResult microsoft.com/en-us/library/system.threading.waithandle.aspx" rel="nofollow noreferrer">WaitHandle 通过 AsyncWaitHandle 属性,并调用 WaitHandle.WaitAll

编辑:正如评论中所指出的,在某些实现上,您只能一次调用最多 64 个句柄的 WaitAll 。替代方案可以依次对每个调用调用 WaitOne,或者批量调用 WaitAll。只要您从不会阻塞线程池的线程中执行此操作,这并不重要。另请注意,您无法从 STA 线程调用 WaitAll

The simplest thing would be to create new threads, and then call Thread.Join on each of them. You could use a semaphore or something like it - but it's probably easier to just create new threads.

In .NET 4.0 you could use Parallel Extensions to do this quite easily with tasks.

As another alternative which would use the threadpool, you could create a delegate and call BeginInvoke on it, to return an IAsyncResult - you can then get the WaitHandle for each result via the AsyncWaitHandle property, and call WaitHandle.WaitAll.

EDIT: As pointed out in comments, you can only call WaitAll with up to 64 handles at a time on some implementations. Alternatives could be calling WaitOne on each of them in turn, or calling WaitAll with batches. It won't really matter, so long as you're doing it from a thread which isn't going to block the threadpool. Also note that you can't call WaitAll from an STA thread.

回心转意 2024-08-22 00:39:09

您仍然想使用 ThreadPool,因为它将管理同时运行的线程数。我最近遇到了一个类似的问题,并像这样解决了它:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();

IDispatcher 和 IJob 看起来像这样:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}

然后使用装饰器来分块 ThreadPool 的使用:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}

IDispatcher 抽象非常适合交换线程技术。我有另一个实现,即 SingleThreadedDispatcher,您可以像 Jon Skeet 建议的那样制作 ThreadStart 版本。然后就可以轻松运行每一个并查看获得的性能。当调试代码或不想终止机器上的处理器时,SingleThreadedDispatcher 非常有用。

编辑:我忘记添加 ThreadPoolWorker 的代码:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}

You still want to use the ThreadPool because it will manage the number of threads it runs simultaneously. I ran into a similar issue recently and solved it like this:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();

The IDispatcher and IJob look like this:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}

And then used a decorator to chunk the use of ThreadPool:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}

The IDispatcher abstraction works pretty well for swapping out your threading technique. I have another implementation that is a SingleThreadedDispatcher and you could make a ThreadStart version like Jon Skeet suggested. Then it's easy to run each one and see what kind of performance you get. The SingleThreadedDispatcher is good when debugging your code or when you don't want to kill the processor on your box.

Edit: I forgot to add the code for ThreadPoolWorker:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}
梦里泪两行 2024-08-22 00:39:09

最简单有效的解决方案是使用计数器并使其线程安全。这将消耗更少的内存,并且可以扩展到更多的线程数

这是一个示例

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");

The simplest and efficient solution would be to use the counters and make them thread safe. This would consume less memory and can scale up to higher number of threads

Here is a sample

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");
埋情葬爱 2024-08-22 00:39:09

我已经使用 SmartThreadPool 成功地解决了这个问题。还有一个关于反汇编的 Codeplex 站点。

SmartThreadPool 还可以帮助解决其他问题,例如某些线程无法同时运行,而其他线程可以同时运行。

I've used SmartThreadPool with much succes to cope with this problem. There is also a Codeplex site about de assembly.

SmartThreadPool can help with other problems as well like some threads cannot run at te same time while others can.

洋洋洒洒 2024-08-22 00:39:09

我使用静态实用程序方法来检查所有单独的等待句柄。

    public static void WaitAll(WaitHandle[] handles)
    {
        if (handles == null)
            throw new ArgumentNullException("handles",
                "WaitHandle[] handles was null");
        foreach (WaitHandle wh in handles) wh.WaitOne();
    }

然后在我的主线程中,我创建这些等待句柄的列表,并且对于放入线程池队列中的每个委托,我将等待句柄添加到列表中。 ..

 List<WaitHandle> waitHndls = new List<WaitHandle>();
 foreach (iterator logic )
 {
      ManualResetEvent txEvnt = new ManualResetEvent(false);

      ThreadPool.QueueUserWorkItem(
           delegate
               {
                   try { // Code to process each task... }
                   // Finally, set each wait handle when done
                   finally { lock (locker) txEvnt.Set(); } 
               });
      waitHndls.Add(txEvnt);  // Add wait handle to List
 }
 util.WaitAll(waitHndls.ToArray());   // Check all wait Handles in List

I use a static utility method to examine all the individual wait handles..

    public static void WaitAll(WaitHandle[] handles)
    {
        if (handles == null)
            throw new ArgumentNullException("handles",
                "WaitHandle[] handles was null");
        foreach (WaitHandle wh in handles) wh.WaitOne();
    }

Then in my main thread, I create a List of these wait handles, and for each delegate I put in my ThreadPool Queue, I add the wait handle to the List...

 List<WaitHandle> waitHndls = new List<WaitHandle>();
 foreach (iterator logic )
 {
      ManualResetEvent txEvnt = new ManualResetEvent(false);

      ThreadPool.QueueUserWorkItem(
           delegate
               {
                   try { // Code to process each task... }
                   // Finally, set each wait handle when done
                   finally { lock (locker) txEvnt.Set(); } 
               });
      waitHndls.Add(txEvnt);  // Add wait handle to List
 }
 util.WaitAll(waitHndls.ToArray());   // Check all wait Handles in List
痴者 2024-08-22 00:39:09

.Net 4.0 使多线程变得更加容易(尽管您仍然可以用副作用来射击自己)。

.Net 4.0 makes multi-threading even easier (although you can still shoot yourself with side effects).

你是暖光i 2024-08-22 00:39:09

另一种选择是使用管道。

您将所有要完成的工作发布到管道中,然后从每个线程的管道中读取数据。当管道为空时,你就完成了,线程自行结束,每个人都很高兴(当然,请确保你首先产生所有工作,然后消耗它)

Another option would be to use a Pipe.

You post all the work to be done to the pipe and then read data from the pipe from each thread. When the pipe is empty, you're done, threads ends themselves and everybody is happy (of course make sure you first produce all the work, then consume it)

反差帅 2024-08-22 00:39:09

我建议将未触及的图像放入队列中,当您从队列中读取时,启动一个线程并将其 System.Threading.Thread.ManagedThreadId 属性与文件名一起插入字典中。这样,您的 UI 就可以列出待处理文件和活动文件。

当每个线程完成时,它会调用回调例程,传回其 ManagedThreadId。此回调(作为线程的委托传递)从字典中删除线程的 id,从队列中启动另一个线程,并更新 UI。

当队列和字典都为空时,就完成了。

稍微复杂一些,但通过这种方式,您可以获得响应式 UI,可以轻松控制活动线程的数量,并且可以看到正在运行的内容。收集统计数据。喜欢 WPF 并为每个文件设置进度条。她不禁被感动。

I suggest putting the untouched images in a queue and as you read from the queue launch a thread and insert its System.Threading.Thread.ManagedThreadId property into a dictionary along with the file name. This way your UI can list both pending and active files.

When each thread completes it invokes a callback routine, passing back its ManagedThreadId. This callback (passed as a delegate to the thread) removes the thread's id from the dictionary, launches another thread from the queue, and updates the UI.

When both the queue and the dictionary are empty, you're done.

Slightly more complicated but this way you get a responsive UI, you can easily control the number of active threads, and you can see what's in flight. Collect statistics. Get fancy with WPF and put up progress bars for each file. She can't help but be impressed.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文