C# - 将数据从 ThreadPool 线程传递回主线程

发布于 2024-10-03 22:03:27 字数 2184 浏览 5 评论 0原文

当前实现:等待直到收集了parallelCount值,使用ThreadPool处理这些值,等待所有线程完成,重新收集另一组值等等...

代码:

private static int parallelCount = 5;
private int taskIndex;
private object[] paramObjects;

// Each ThreadPool thread should access only one item of the array, 
// release object when done, to be used by another thread
private object[] reusableObjects = new object[parallelCount];     

private void MultiThreadedGenerate(object paramObject)
{
    paramObjects[taskIndex] = paramObject;
    taskIndex++;

    if (taskIndex == parallelCount)
    { 
        MultiThreadedGenerate();

        // Reset
        taskIndex = 0;
    }
}

/*
 * Called when 'paramObjects' array gets filled
 */
private void MultiThreadedGenerate()
{
    int remainingToGenerate = paramObjects.Count;

    resetEvent.Reset();

    for (int i = 0; i < paramObjects.Count; i++)
    {
        ThreadPool.QueueUserWorkItem(delegate(object obj)
        {
            try
            {
                int currentIndex = (int) obj;       

                Generate(currentIndex, paramObjects[currentIndex], reusableObjects[currentIndex]);
            }
            finally
            {
                if (Interlocked.Decrement(ref remainingToGenerate) == 0)
                {
                    resetEvent.Set();
                }
            }
        }, i);
    }

    resetEvent.WaitOne();    
}

我已经看到这种方法显着提高了性能,但是有许多问题需要考虑:

[1] 收集 paramObjects 中的值并使用 resetEvent 进行同步可以避免,因为线程之间(或当前的值集与下一组值)之间不存在依赖关系。我这样做只是为了管理对 reusableObjects 的访问(当一组 paramObjects 完成处理时,我知道 reusableObjects 中的所有对象都是免费的,因此 taskIndex< /code> 被重置,下一组值的每个新任务将有其唯一的“reusableObj”来使用)。

[2] reusableObjects 的大小和 ThreadPool 使用的线程数之间没有真正的联系。我可能会将 reusableObjects 初始化为具有 10 个对象,并且由于某些限制,ThreadPool 只能为我的 MultiThreadedGenerate() 方法运行 3 个线程,那么我就浪费了内存。

因此,通过摆脱 paramObjects,如何才能以一种方式改进上述代码,以便一旦一个线程完成其工作,该线程就返回其 taskIndex(或 < code>reusableObj) 它已使用且不再需要,以便它可用于下一个值。此外,代码应该创建一个 reUsableObject 并仅在需要时将其添加到某个集合中。在这里使用队列是个好主意吗?

谢谢。

Current implementation: Waits until parallelCount values are collected, uses ThreadPool to process the values, waits until all threads complete, re-collect another set of values and so on...

Code:

private static int parallelCount = 5;
private int taskIndex;
private object[] paramObjects;

// Each ThreadPool thread should access only one item of the array, 
// release object when done, to be used by another thread
private object[] reusableObjects = new object[parallelCount];     

private void MultiThreadedGenerate(object paramObject)
{
    paramObjects[taskIndex] = paramObject;
    taskIndex++;

    if (taskIndex == parallelCount)
    { 
        MultiThreadedGenerate();

        // Reset
        taskIndex = 0;
    }
}

/*
 * Called when 'paramObjects' array gets filled
 */
private void MultiThreadedGenerate()
{
    int remainingToGenerate = paramObjects.Count;

    resetEvent.Reset();

    for (int i = 0; i < paramObjects.Count; i++)
    {
        ThreadPool.QueueUserWorkItem(delegate(object obj)
        {
            try
            {
                int currentIndex = (int) obj;       

                Generate(currentIndex, paramObjects[currentIndex], reusableObjects[currentIndex]);
            }
            finally
            {
                if (Interlocked.Decrement(ref remainingToGenerate) == 0)
                {
                    resetEvent.Set();
                }
            }
        }, i);
    }

    resetEvent.WaitOne();    
}

I've seen significant performance improvements with this approach, however there are a number of issues to consider:

[1] Collecting values in paramObjects and synchronization using resetEvent can be avoided as there is no dependency between the threads (or current set of values with the next set of values). I'm only doing this to manage access to reusableObjects (when a set paramObjects is done processing, I know that all objects in reusableObjects are free, so taskIndex is reset and each new task of the next set of values will have its unique 'reusableObj' to work with).

[2] There is no real connection between the size of reusableObjects and the number of threads the ThreadPool uses. I might initialize reusableObjects to have 10 objects, and say due to some limitations, ThreadPool can run only 3 threads for my MultiThreadedGenerate() method, then I'm wasting memory.

So by getting rid of paramObjects, how can the above code be refined in a way that as soon as one thread completes its job, that thread returns its taskIndex(or the reusableObj) it used and no longer needs so that it becomes available to the next value. Also, the code should create a reUsableObject and add it to some collection only when there is a demand for it. Is using a Queue here a good idea ?

Thank you.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

脱离于你 2024-10-10 22:03:27

确实没有理由再进行自己的手动线程和任务管理。您可以使用任务并行库将其重组为更松散耦合的模型(可能还有 System.Collections.Concurrent 用于结果整理)。

如果您不需要等待全部工作完成后再移交每个 Task 进行处理,则性能可以进一步提高。

TPL 出现在 .Net 4.0 中,但 向后移植到 .Net 3.5。在此处下载。

There's really no reason to do your own manual threading and task management any more. You could restructure this to a more loosely-coupled model using Task Parallel Library (and possibly System.Collections.Concurrent for result collation).

Performance could be further improved if you don't need to wait for a full complement of work before handing off each Task for processing.

TPL came along in .Net 4.0 but was back-ported to .Net 3.5. Download here.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文