处理多处理线程

发布于 2024-11-09 05:43:23 字数 7115 浏览 0 评论 0原文

我在 .Net 4.0 C# 上管理线程时遇到问题,我对线程的了解不足以解决它,所以我将其发布在这里,希望有人能给我一些建议。

场景如下:

我们在 C# 框架 4.0 上有一个 Windows 服务,(1)通过套接字连接到服务器以获取 .PCM 文件,(2)然后将其转换为 .WAV 文件,(3)通过电子邮件 - SMTP,最后 (4) 通知初始服务器已成功发送。

安装该服务的服务器有 8 个处理器和 8 GB 或 RAM。

为了允许多处理,我构建了具有 4 个线程的服务,每个线程都执行我之前提到的每一项任务。

在代码中,我为每个任务都有类和方法,因此我创建线程并调用方法,如下所示:

 Thread eachThread = new Thread(object.PerformTask);

在每个方法内,我都有一个 While 来检查套接字的连接是否处于活动状态并继续获取数据或处理数据,具体取决于在他们的海豚身上。

while (_socket.Connected){ 
//perform task
}

问题是,随着安装更多服务(相同的 Windows 服务被复制并指向服务器上的两个端点以通过套接字获取文件),CPU 消耗急剧增加,每个服务继续运行和处理文件,但有一段时间如果CPU消耗太高,服务器就会崩溃。

问题是:您建议我如何处理这种情况,我的意思是一般而言,处理这种高要求的处理任务以避免服务器在 CPU 消耗方面崩溃的好方法是什么?

谢谢。

PS:如果有人需要有关该场景的更多详细信息,请告诉我。

编辑 1

对于 CPU 崩溃,我的意思是服务器变得太慢,我们必须重新启动它。

编辑 2

在这里,我发布了部分代码,以便您可以了解它的编程方式:

while(true){
            //starting the service    
            try
            {
                IPEndPoint endPoint = conn.SettingConnection();
                string id = _objProp.Parametros.IdApp;

                using (socket = conn.Connect(endPoint))
                {
                    while (!socket.Connected)
                    {
                        _log.SetLog("INFO", "Conectando socket...");
                        socket = conn.Connect(endPoint);

                        //if the connection failed, wait 5 seconds for a new try.
                        if (!socket.Connected)
                        {
                            Thread.Sleep(5000);
                        }
                    }

                    proInThread = new Thread(proIn.ThreadRun);
                    conInThread = new Thread(conIn.ThreadRun);
                    conOutThread = new Thread(conOut.ThreadRun);

                    proInThread.Start();
                    conInThread.Start();
                    conOutThread.Start();

                    proInThread.Join();
                    conInThread.Join();
                    conOutThread.Join();
                }
          }
     }

编辑 3

  • 线程 1

    <块引用>

    while (_socket.Connected) { 尝试 { var conn = new AppConection(ref _objPropiedades);

    <前><代码>尝试 { 字符串消息 = conn.ReceiveMessage(_socket); 锁 (((ICollection)_queue).SyncRoot) { _queue.Enqueue(消息); _syncEvents.NewItemEvent.Set(); _syncEvents.NewResetEvent.Set(); } 锁 (((ICollection)_total_rec).SyncRoot) { _total_rec.Add("1"); } } catch (SocketException 前) { //记录异常 } 捕获(IndexOutOfRangeException 前) { //记录异常 } catch(异常前) { //记录异常 } //收到消息 } catch(异常前) { //记录错误 } } //释放任何可能正在使用内存的实例 _socket.Dispose(); 日志=空;
  • 主题 2

    <块引用>

    while (_socket.Connected) { 尝试{ _syncEvents.NewItemEventOut.WaitOne();

     if (_socket.Connected)
                        {
                            锁 (((ICollection)_queue).SyncRoot)
                            {
    
                                Total_queue = _queue.Count();
    
                            }
    
                            整数 i = 0;
                            while (i < 总队列)
                            {
                                //EMail 电子邮件;
                                字符串邮件=“”;
                                锁 (((ICollection)_queue).SyncRoot)
                                {
    
                                    邮件=_queue.Dequeue();
    
                                    我=我+1;
                                }
                                尝试
                                {
                                    conn.SendMessage(_socket, 邮件);
                                    _syncEvents.NewResetEvent.Set();
                                }
                                catch (SocketException 前)
                                {
                                    //记录异常
                                }
                            }
                        }
                        别的
                        {
                            //记录异常
    
                            _syncEvents.NewAbortEvent.Set();
                            Thread.CurrentThread.Abort();
                        }
                    }
                    捕获(InvalidOperationException e)
                    {
                        //记录异常
                    }
                    捕获(异常 e)
                    {
                        //记录异常
                    }
            }
    
            //释放任何可能正在使用内存的实例
            _socket.Dispose();
            康涅狄格州=空;
            日志=空;
    
    线程
  • 主题 3

    <块引用>

    while (_socket.Connected) {

     int Total_queue = 0;
                    尝试
                {
                    _syncEvents.NewItemEvent.WaitOne();
                    锁(((ICollection)_queue).SyncRoot)
                    {
                        Total_queue = _queue.Count();
                    }
                    整数 i = 0;
                    while (i < 总队列)
                    {
                        if (mgthreads.GetThreatdAct() <
    

    mgthreads.GetMaxThread()) { 字符串消息=“”; 锁(((ICollection)_queue).SyncRoot) {

     message = _queue.Dequeue();
                                我=我+1;
    
                            }
                            计数++;
                            锁(((ICollection)_queueO).SyncRoot)
                            {
                                应用程序.SetParameters(_socket, _id,
    

    消息,_queueO,_syncEvents, _total_Env, _total_err); }

     线程生产者Thread = new
    

    线程(app.ThreadJob) { 名称 = “生产者线程_”+ DateTime.Now.ToString("ddMMyyyyhhmmss"), 优先级 = ThreadPriority.AboveNormal }; 生产者线程.Start();

     ProducerThread.Join();
    
                            mgthreads.IncThreatdAct(生产者线程);
                        }
                        mgthreads.DecThreatdAct();
                    }
                    mgthreads.DecThreatdAct();
                }
                捕获(InvalidOperationException e)
                {
    
                }
                捕获(异常 e)
                {
    
                }
                线程.睡眠(500);
            }
    
            //释放任何可能正在使用内存的实例
            _socket.Dispose();
            应用=空;
            日志=空;
            mgthreads = null;
    
    线程
  • 线程 4

    <块引用>

    MessageVO mesVo = fac.ParseMessageXml(_message);

    线程

I'm experiencing an issue managing threads on .Net 4.0 C#, and my knowledge of threads is not sufficient to solve it, so I've post it here expecting that somebody could give me some piece of advise please.

The scenario is the following:

We have a Windows service on C# framework 4.0 that (1)connects via socket to a server to get a .PCM file, (2)then convert it to a .WAV file, (3)send it via Email - SMTP and finally (4)notify the initial server that it was successfully sent.

The server where the service had been installed has 8 processors and 8 GB or RAM.

To allow multiprocessing I've built the service with 4 threads, each one of them performs each task I mentioned previously.

On the code, I have classes and methods for each task, so I create threads and invoke methods as follows:

 Thread eachThread = new Thread(object.PerformTask);

Inside each method I'm having a While that checks if the connection of the socket is alive and continue fetching data or processing data depending on their porpuse.

while (_socket.Connected){ 
//perform task
}

The problem is that as more services are being installed (the same windows service is replicated and pointed between two endpoints on the server to get the files via socket) the CPU consumption increases dramatically, each service continues running and processing files but there is a moment were the CPU consumption is too high that the server just collapse.

The question is: what would you suggest me to handle this scenario, I mean in general terms what could be a good approach of handling this highly demanded processing tasks to avoid the server to collapse in CPU consumption?

Thanks.

PS.: If anybody needs more details on the scenario, please let me know.

Edit 1

With CPU collapse I mean that the server gets too slow that we have to restart it.

Edit 2

Here I post some part of the code so you can get an idea of how it's programmed:

while(true){
            //starting the service    
            try
            {
                IPEndPoint endPoint = conn.SettingConnection();
                string id = _objProp.Parametros.IdApp;

                using (socket = conn.Connect(endPoint))
                {
                    while (!socket.Connected)
                    {
                        _log.SetLog("INFO", "Conectando socket...");
                        socket = conn.Connect(endPoint);

                        //if the connection failed, wait 5 seconds for a new try.
                        if (!socket.Connected)
                        {
                            Thread.Sleep(5000);
                        }
                    }

                    proInThread = new Thread(proIn.ThreadRun);
                    conInThread = new Thread(conIn.ThreadRun);
                    conOutThread = new Thread(conOut.ThreadRun);

                    proInThread.Start();
                    conInThread.Start();
                    conOutThread.Start();

                    proInThread.Join();
                    conInThread.Join();
                    conOutThread.Join();
                }
          }
     }

Edit 3

  • Thread 1

    while (_socket.Connected)
    {
    try
    {
    var conn = new AppConection(ref _objPropiedades);

                    try
                    {
                        string message = conn.ReceiveMessage(_socket);
                        lock (((ICollection)_queue).SyncRoot)
                        {
                            _queue.Enqueue(message);
                            _syncEvents.NewItemEvent.Set();
                            _syncEvents.NewResetEvent.Set();
                        }
                        lock (((ICollection)_total_rec).SyncRoot)
                        {
    
                            _total_rec.Add("1");
                        }
                    }
                    catch (SocketException ex)
                    {
                        //log exception
    
                    }
                    catch (IndexOutOfRangeException ex)
                    {
                        //log exception
                    }
                    catch (Exception ex)
                    {
                       //log exception
    
                    }
                    //message received
    
                }
                catch (Exception ex)
                {
                   //logging error
                }
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            log = null;
    
  • Thread 2

    while (_socket.Connected)
    {
    try{
    _syncEvents.NewItemEventOut.WaitOne();

                        if (_socket.Connected)
                        {
                            lock (((ICollection)_queue).SyncRoot)
                            {
    
                                total_queue = _queue.Count();
    
                            }
    
                            int i = 0;
                            while (i < total_queue)
                            {
                                //EMail Emails;
                                string mail = "";
                                lock (((ICollection)_queue).SyncRoot)
                                {
    
                                    mail = _queue.Dequeue();
    
                                    i = i + 1;
                                }
                                try
                                {
                                    conn.SendMessage(_socket, mail);
                                    _syncEvents.NewResetEvent.Set();
                                }
                                catch (SocketException ex)
                                {
                                    //log exception
                                }
                            }
                        }
                        else
                        {
                            //log exception
    
                            _syncEvents.NewAbortEvent.Set();
                            Thread.CurrentThread.Abort();
                        }
                    }
                    catch (InvalidOperationException e)
                    {
                        //log exception
                    }
                    catch (Exception e)
                    {
                        //log exception
                    }
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            conn = null;
            log = null;
    
  • Thread 3

    while (_socket.Connected)
    {

                    int total_queue = 0;
                    try
                {
                    _syncEvents.NewItemEvent.WaitOne();
                    lock (((ICollection) _queue).SyncRoot)
                    {
                        total_queue = _queue.Count();
                    }
                    int i = 0;
                    while (i < total_queue)
                    {
                        if (mgthreads.GetThreatdAct() <
    

    mgthreads.GetMaxThread())
    {
    string message = "";
    lock (((ICollection) _queue).SyncRoot)
    {

                                message = _queue.Dequeue();
                                i = i + 1;
    
                            }
                            count++;
                            lock (((ICollection) _queueO).SyncRoot)
                            {
                                app.SetParameters(_socket, _id,
    

    message, _queueO, _syncEvents,
    _total_Env, _total_err);
    }

                            Thread producerThread = new
    

    Thread(app.ThreadJob) { Name =
    "ProducerThread_" +
    DateTime.Now.ToString("ddMMyyyyhhmmss"),
    Priority = ThreadPriority.AboveNormal
    };
    producerThread.Start();

                            producerThread.Join();
    
                            mgthreads.IncThreatdAct(producerThread);
                        }
                        mgthreads.DecThreatdAct();
                    }
                    mgthreads.DecThreatdAct();
                }
                catch (InvalidOperationException e)
                {
    
                }
                catch (Exception e)
                {
    
                }
                Thread.Sleep(500);
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            app = null;
            log = null;
            mgthreads = null;
    
  • Thread 4

    MessageVO mesVo =
    fac.ParseMessageXml(_message);

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

dawn曙光 2024-11-16 05:43:23

我会降低线程优先级,并让所有线程都通过信号量,从而将并发限制为Environment.ProcessorCount。这不是一个完美的解决方案,但听起来在这种情况下就足够了并且很容易修复。

编辑:想一想,您必须将 10 个服务合并到一个进程中,否则您将无法集中控制正在运行的线程。如果你有 10 个独立的进程,它们就无法协调。

I would lower the thread priority and have all threads pass through a Semaphore that limits concurrency to Environment.ProcessorCount. This not a perfect solution but it sounds like it is enough in this case and an easy fix.

Edit: Thinking about it, you have to fold the 10 services into one single process because otherwise you won't have centralized control about the threads that are running. If you have 10 independent processes they cannot coordinate.

不如归去 2024-11-16 05:43:23

通常不会因为 CPU 使用率高而崩溃。当任何线程正在等待远程事件发生时(例如远程服务器响应请求),该线程不使用 cpu 资源。但当它实际上在做某事时,它会相应地使用 cpu。在您提到的任务中,不存在固有的高CPU使用率(因为将PCM文件保存为WAV不需要复杂的算法),因此高CPU使用率似乎是编程错误的标志。

There should normally be no collapse because of high cpu usage. While any of the threads is waiting for something remote to happen (for instance for the remote server to response to the request), that thread uses no cpu resource. But while it is actually doing something, it uses cpu accordingly. In the Task you mentioned, there is no inherent high cpu usage (as the saving of PCM file as WAV requires no complex algorithm), so the high cpu usage seems to be a sign of an error in programming.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文