线程池/WaitHandle资源泄漏/崩溃

发布于 2024-09-09 17:08:35 字数 5261 浏览 7 评论 0原文

我想我可能需要重新考虑我的设计。我很难缩小导致我的计算机完全挂起的错误范围,有时会从 VS 2010 抛出 HRESULT 0x8007000E。

我有一个控制台应用程序(稍后我将其转换为服务),它处理基于以下内容的文件传输:数据库队列。

我正在限制允许传输的线程。这是因为我们连接的某些系统只能包含来自某些帐户的一定数量的连接。

例如,系统 A 只能接受 3 个同时连接(这意味着 3 个独立的线程)。这些线程中的每一个都有自己独特的连接对象,因此我们不应该遇到任何同步问题,因为它们不共享连接。

我们希望循环处理来自这些系统的文件。例如,我们将允许 3 个连接,每个连接最多可以传输 100 个文件。这意味着,要从系统 A 移动 1000 个文件,我们每个周期只能处理 300 个文件,因为允许 3 个线程,每个线程 100 个文件。因此,在此传输的生命周期中,我们将有 10 个线程。我们一次只能运行 3 个。因此,将会有 3 个周期,最后一个周期将仅使用 1 个线程来传输最后 100 个文件。 (3 个线程 x 100 个文件 = 每个周期 300 个文件)

当前的架构示例是:

  1. ) 来执行某些操作。
  2. System.Threading.Timer 每 5 秒检查一次队列,通过调用GetScheduledTask( 无
  3. 如果有工作,创建一个 ThreadPool 线程来处理工作 [工作线程 A]
  4. 工作线程 A 看到有 1000 个文件需要传输
  5. 工作线程 A 看到它只能有 3 个线程运行到从中获取文件的系统
  6. 工作线程 A 启动三个新工作线程 [B,C,D] 并传输
  7. 工作线程 A 等待 B,C,D [WaitHandle.WaitAll(transfersArray)]
  8. 工作线程 A 看到还有队列中有更多文件(现在应该是 700 个)
  9. 工作线程 A 创建一个新数组来等待 [transfersArray = new TransferArray[3] 这是系统 A 的最大值,但可能因系统
  10. 工作 而异线程 A 启动三个新工作线程 [B,C,D] 并等待它们[WaitHandle.WaitAll(transfersArray)]
  11. 重复该过程,直到没有更多文件可以移动。
  12. 工作线程 A 发出信号表明它已完成,

我正在使用 ManualResetEvent 来处理信号。

我的问题是:

  1. 是否有任何明显的情况会导致资源泄漏或我遇到的问题?
  2. 我是否应该在每个 WaitHandle.WaitAll(array) 之后循环遍历数组并调用 array[index].Dispose()?
  3. 任务管理器下此过程的句柄计数缓慢 。
  4. 我正在从 System.Threading.Timer 调用工作线程 A 的初始创建 这样会不会有什么问题呢?该计时器的代码是:(

一些用于调度的类代码)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}

I think I may need to re-think my design. I'm having a hard time narrowing down a bug that is causing my computer to completely hang, sometimes throwing an HRESULT 0x8007000E from VS 2010.

I have a console application (that I will later convert to a service) that handles transferring files based on a database queue.

I am throttling the threads allowed to transfer. This is because some systems we are connecting to can only contain a certain number of connections from certain accounts.

For example, System A can only accept 3 simultaneous connections (which means 3 separate threads). Each one of these threads has their own unique connection object, so we shouldn't run in to any synchronization problems since they aren't sharing a connection.

We want to process the files from those systems in cycles. So, for example, we will allow 3 connections that can transfer up to 100 files per connection. This means, to move 1000 files from System A, we can only process 300 files per cycle, since 3 threads are allowed with 100 files each. Therefore, over the lifetime of this transfer, we will have 10 threads. We can only run 3 at a time. So, there will be 3 cycles, and the last cycle will only use 1 thread to transfer the last 100 files. (3 threads x 100 files = 300 files per cycle)

The current architecture by example is:

  1. A System.Threading.Timer checks the queue every 5 seconds for something to do by calling GetScheduledTask()
  2. If there's nothing to, GetScheduledTask() simply does nothing
  3. If there is work, create a ThreadPool thread to process the work [Work Thread A]
  4. Work Thread A sees that there are 1000 files to transfer
  5. Work Thread A sees that it can only have 3 threads running to the system it is getting files from
  6. Work Thread A starts three new work threads [B,C,D] and transfers
  7. Work Thread A waits for B,C,D [WaitHandle.WaitAll(transfersArray)]
  8. Work Thread A sees that there are still more files in the queue (should be 700 now)
  9. Work Thread A creates a new array to wait on [transfersArray = new TransferArray[3] which is the max for System A, but could vary on system
  10. Work Thread A starts three new work threads [B,C,D] and waits for them [WaitHandle.WaitAll(transfersArray)]
  11. The process repeats until there are no more files to move.
  12. Work Thread A signals that it is done

I am using ManualResetEvent to handle the signaling.

My questions are:

  1. Is there any glaring circumstance which would cause a resource leak or problem that I am experiencing?
  2. Should I loop thru the array after every WaitHandle.WaitAll(array) and call array[index].Dispose()?
  3. The Handle count under the Task Manager for this process slowly creeps up
  4. I am calling the initial creation of Worker Thread A from a System.Threading.Timer. Is there going to be any problems with this? The code for that timer is:

(Some class code for scheduling)

private ManualResetEvent _ResetEvent;

private void Start()
{
    _IsAlive = true;
    ManualResetEvent transferResetEvent = new ManualResetEvent(false);
    //Set the scheduler timer to 5 second intervals
    _ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}

private void ScheduledTasks_Tick(object state)
{
    ManualResetEvent resetEvent = null;
    try
    {
        resetEvent = (ManualResetEvent)state;
        //Block timer until GetScheduledTasks() finishes
        _ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
        GetScheduledTasks();
    }
    finally
    {
        _ScheduledTasks.Change(5000, 5000);
        Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
        resetEvent.Set();
    }
}


private void GetScheduledTask()
{
    try 
    { 
        //Check to see if the database connection is still up
        if (!_IsAlive)
        {
            //Handle
            _ConnectionLostNotification = true;
            return;
        }

        //Get scheduled records from the database
        ISchedulerTask task = null;

        using (DataTable dt = FastSql.ExecuteDataTable(
                _ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
                new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
        {
            if (dt != null)
            {
                if (dt.Rows.Count == 1)
                {  //Only 1 row is allowed
                    DataRow dr = dt.Rows[0];

                    //Get task information
                    TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
                    task = ScheduledTaskFactory.CreateScheduledTask(taskType);

                    task.Description = dr["Description"].ToString();
                    task.IsEnabled = (bool)dr["IsEnabled"];
                    task.IsProcessing = (bool)dr["IsProcessing"];
                    task.IsManualLaunch = (bool)dr["IsManualLaunch"];
                    task.ProcessMachineName = dr["ProcessMachineName"].ToString();
                    task.NextRun = (DateTime)dr["NextRun"];
                    task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
                    task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
                    task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
                    task.SleepMinutes = (int)dr["SleepMinutes"];
                    task.ScheduleId = (int)dr["ScheduleId"];
                    task.CurrentRuns = (int)dr["CurrentRuns"];
                    task.TotalRuns = (int)dr["TotalRuns"];

                    SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
                    //Queue up task to worker thread and start
                    ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);     
                }
            }
        }

    }
    catch (Exception ex)
    {
        //Handle
    }
}

private void ThreadProc(object taskObject)
{
    SchedulerTask task = (SchedulerTask)taskObject;
    ScheduledTaskEngine engine = null;
    try
    {
        engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
        engine.StartTask(task.Task);    
    }
    catch (Exception ex)
    {
        //Handle
    }
    finally
    {
        task.TaskResetEvent.Set();
        task.TaskResetEvent.Dispose();
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

街角迷惘 2024-09-16 17:08:35

0x8007000E 是内存不足错误。这和句柄计数似乎表明存在资源泄漏。确保您正在处理实现 IDisposable 的每个对象。这包括您正在使用的 ManualResetEvent 数组。

如果您有时间,您可能还想转换为使用 .NET 4.0 Task 类;它旨在更干净地处理此类复杂场景。通过定义子 Task 对象,您可以减少总线程数(线程非常昂贵,不仅因为调度,还因为它们的堆栈空间)。

0x8007000E is an out-of-memory error. That and the handle count seem to point to a resource leak. Ensure you're disposing of every object that implements IDisposable. This includes the arrays of ManualResetEvents you're using.

If you have time, you may also want to convert to using the .NET 4.0 Task class; it was designed to handle complex scenarios like this much more cleanly. By defining child Task objects, you can reduce your overall thread count (threads are quite expensive not only because of scheduling but also because of their stack space).

影子的影子 2024-09-16 17:08:35

我正在寻找类似问题的答案(句柄数随着时间的推移而增加)。

我查看了您的应用程序架构,并想向您推荐一些可以帮助您的建议:

您听说过 IOCP(输入输出完成端口)吗?

我不确定使用 C# 实现这一点是否困难,但在 C/C++ 中这是小菜一碟。
通过使用它,您可以创建一个唯一的线程池(该池中的线程数量通常定义为 PC 或服务器中处理器或处理器核心数量的 2 倍)
您将此池与 IOCP 句柄关联,然后该池将完成工作。
请参阅这些功能的帮助:
创建IoCompletionPort();
PostQueuedCompletionStatus();
获取队列完成状态();

一般来说,动态创建和退出线程可能非常耗时,并会导致性能损失和内存碎片。
MSDN 和 google 上有数千篇有关 IOCP 的文献。

I'm looking for answers to a similar problem (Handles Count increasing over time).

I took a look at your application architecture and like to suggest you something that could help you out:

Have you heard about IOCP (Input Output Completion Ports).

I'm not sure of the dificulty to implement this using C# but in C/C++ it is a piece of cake.
By using this you create a unique thread pool (The number of threads in that pool is in general defined as 2 x the number of processors or processors cores in the PC or server)
You associate this pool to a IOCP Handle and the pool does the work.
See the help for these functions:
CreateIoCompletionPort();
PostQueuedCompletionStatus();
GetQueuedCompletionStatus();

In General creating and exiting threads on the fly could be time consuming and leads to performance penalties and memory fragmentation.
There are thousands of literature about IOCP in MSDN and in google.

葵雨 2024-09-16 17:08:35

我认为你应该重新考虑你的架构。事实上,您只能同时拥有 3 个连接,这几乎要求您使用 1 个线程来生成文件列表,并使用 3 个线程来处理它们。您的生产者线程会将所有文件插入队列中,3 个消费者线程将在项目到达队列时出队并继续处理。阻塞队列可以显着简化代码。如果您使用的是 .NET 4.0,则可以利用 BlockingCollection 类。

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

我在上面的示例中确实过于简单化了,但我希望您能了解总体思路。请注意,这要简单得多,因为线程同步的方式并不多(大多数将嵌入到阻塞队列中),当然也没有使用 WaitHandle 对象。显然,您必须添加正确的机制来正常关闭线程,但这应该相当容易。

I think you should reconsider your architecture altogether. The fact that you can only have 3 simultaneously connections is almost begging you to use 1 thread to generate the list of files and 3 threads to process them. Your producer thread would insert all files into a queue and the 3 consumer threads will dequeue and continue processing as items arrive in the queue. A blocking queue can significantly simplify the code. If you are using .NET 4.0 then you can take advantage of the BlockingCollection class.

public class Example
{
    private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

    public void Start()
    {
        var threads = new Thread[] 
            { 
                new Thread(Producer), 
                new Thread(Consumer), 
                new Thread(Consumer), 
                new Thread(Consumer) 
            };
        foreach (Thread thread in threads)
        {
            thread.Start();
        }
    }

    private void Producer()
    {
        while (true)
        {
            Thread.Sleep(TimeSpan.FromSeconds(5));
            ScheduledTask task = GetScheduledTask();
            if (task != null)
            {
                foreach (string file in task.Files)
                {
                    m_Queue.Add(task);
                }
            }
        }
    }

    private void Consumer()
    {
        // Make a connection to the resource that is assigned to this thread only.
        while (true)
        {
            string file = m_Queue.Take();
            // Process the file.
        }
    }
}

I have definitely oversimplified things in the example above, but I hope you get the general idea. Notice how this is much simpler as there is not much in the way of thread synchronization (most will be embedded in the blocking queue) and of course there is no use of WaitHandle objects. Obviously you would have to add in the correct mechanisms to shut down the threads gracefully, but that should be fairly easy.

反话 2024-09-16 17:08:35

事实证明,这个奇怪问题的根源与架构无关,而是因为将解决方案从 3.5 转换为 4.0。我重新创建了解决方案,没有执行任何代码更改,并且问题再也没有发生。

It turns out the source of this strange problem was not related to architecture but rather because of converting the solution from 3.5 to 4.0. I re-created the solution, performing no code changes, and the problem never occurred again.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文