确保调用 Socket.XXXAsync 的线程保持活动状态以完成 IO 请求(IO 完成端口,C#)

发布于 2024-10-28 11:08:21 字数 5654 浏览 0 评论 0原文

我正在实现一个小型库,以便更轻松地使用 System.Net.Sockets.Socket。它应该处理任意数量的侦听和接收 TCP 套接字,并且实现速度快非常重要。

我使用 XXXAsync 方法和 CLR ThreadPool 回调库用户的委托(例如,每当成功发送消息或接收到某些数据时) 。最好该库不会自行启动任何线程。

该库的用户可以访问我的 Sockets 包装器的接口来发送消息或开始接收消息(以及许多其他方法和重载):

public interface IClientSocket {
    // will eventually call Socket.SendAsync
    IMessageHandle SendAsync(byte[] buffer, int offset, int length);

    // will eventually call Socket.RecieveAsync
    void StartReceiveAsync();
}

XXXAsync 方法使用 IO Completion港口。因此,调用这些方法的线程必须保持活动状态,直到操作完成,否则操作将失败并显示 SocketError.OperationAborted(我认为是这种情况,或者不是?)。 对库的用户施加这样的限制是丑陋的并且容易出错。

这里最好的选择是什么?

  • 使用委托调用ThreadPool.QueueUserWorkItem来调用XXXAsync方法?这样安全吗?我在某处读到,ThreadPool 不会停止具有任何 IOCP 的空闲线程。那就太好了,因为它解决了上述问题。
    但对于许多 TCP 连接也可能会很糟糕。 在这种情况下,每个 ThreadPool 线程很可能都会调用一个待处理的 ReceiveAsync 调用。因此,即使当前工作负载较低并且许多线程处于空闲状态(并且浪费内存),ThreadPool 也永远不会收缩。

  • 启动一个始终处于活动状态的专用线程并调用XXXAsync方法。例如,当库用户想要发送数据时,它将委托放入同步队列中,线程将其弹出并调用 SendAsync 方法。
    我不太喜欢这个解决方案,因为它浪费了一个线程,并且在多核机器上,发送仅由一个线程执行。

此外,这两种解决方案都不是最好的,因为它们将调用异步方法的作业传递给另一个线程。这可以避免吗?

你怎么认为? (谢谢!!)

Thomas

编辑1:

我可以使用以下测试程序重现SocketError.OperationAborted 问题(我认为它是正确的)。
编译、启动并telnet到端口127.0.0.1:10000。发送“t”或“T”并等待> 3秒。发送“T”时,ReceiveAsync 调用在 ThreadPool 中完成(有效),发送“t”时将启动一个新线程,并在 3 秒后终止(失败)。

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Test {
    class Program {
        static List<Socket> _referenceToSockets = new List<Socket>();
        static void Main(string[] args) {
            Thread.CurrentThread.Name = "Main Thread";

            // Create a listening socket on Port 10000.
            Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            _referenceToSockets.Add(ServerSocket);
            var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
            ServerSocket.Bind(endPoint);
            ServerSocket.Listen(50);

            // Start listening.
            var saeaAccept = new SocketAsyncEventArgs();
            saeaAccept.Completed += OnCompleted;
            ServerSocket.AcceptAsync(saeaAccept);

            Console.WriteLine(String.Format("Listening on {0}.", endPoint));
            Console.ReadLine();
        }

        private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
            var socket = (Socket)obj;
            Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
            switch (evt.LastOperation) {
                case SocketAsyncOperation.Accept:
                    // Client connected. Listen for more.
                    Socket clientSocket = evt.AcceptSocket;
                    _referenceToSockets.Add(clientSocket);
                    evt.AcceptSocket = null;
                    socket.AcceptAsync(evt);

                    // Start receiving data.
                    var saeaReceive = new SocketAsyncEventArgs();
                    saeaReceive.Completed += OnCompleted;
                    saeaReceive.SetBuffer(new byte[1024], 0, 1024);
                    clientSocket.ReceiveAsync(saeaReceive);
                    break;
                case SocketAsyncOperation.Disconnect:
                    socket.Close();
                    evt.Dispose();
                    break;
                case SocketAsyncOperation.Receive:
                    if (evt.SocketError != SocketError.Success) {
                        socket.DisconnectAsync(evt);
                        return;
                    }
                    var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
                    Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
                    if (evt.BytesTransferred == 0) {
                        socket.Close();
                        evt.Dispose();
                    }
                    if (asText.ToUpper().StartsWith("T")) {
                        Action<object> action = (object o) => {
                            socket.ReceiveAsync(evt);
                            Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
                            Thread.Sleep(3000);
                            Console.WriteLine("End of Action...");
                        };
                        if (asText.StartsWith("T")) {
                            ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
                        } else {
                            new Thread(o=>action(o)).Start("in a new Thread");
                        }
                    } else {
                        socket.ReceiveAsync(evt);
                    }
                    break;
            }
        }
    }
}

编辑 #3

以下是我打算使用的解决方案:

我让库用户的线程直接调用 XXXAsync(SendAsync 除外)操作。在大多数情况下,调用将会成功(因为调用线程很少终止)。
如果操作因 SocketError.OperationAborted 失败,则库仅使用异步回调中的当前线程再次调用该操作。这是一个 ThreadPool 线程,它很有可能成功(如果出现 SocketError.OperationAborted 的原因,将设置一个附加标志,最多使用一次此解决方法)是由于其他一些错误造成的)。 这应该可以工作,因为套接字本身仍然没问题,只是之前的操作失败了。

对于 SendAsync,此解决方法不起作用,因为它可能会打乱消息的顺序。在本例中,我会将消息放入 FIFO 列表中进行排队。我将使用 ThreadPool 将它们出列并通过 SendAsync 发送它们。

I am implementing a small library to make the use of System.Net.Sockets.Socket easier. It should handle an arbitrary number of listening and receiving TCP sockets and it is important that the implementation is ~fast~.

I am using the XXXAsync methods and the CLR ThreadPool to call back into the delegates of the library user (for example, whenever a message was successfully sent or some data was received). Preferable the library wouldn't start any threads by itself.

The user of the library can access an interface to my Sockets wrapper to send message or to start receiving messages (among many other methods and overloads):

public interface IClientSocket {
    // will eventually call Socket.SendAsync
    IMessageHandle SendAsync(byte[] buffer, int offset, int length);

    // will eventually call Socket.RecieveAsync
    void StartReceiveAsync();
}

The XXXAsync methods use IO Completion Ports. Thus, the thread which calls these methods must stay alive until the operation completes, otherwise the operation fails with SocketError.OperationAborted (I think that is the case, or no?).
Imposing such a restriction on the user of the library is ugly and error prone.

What is the best alternative here?

  • Calling ThreadPool.QueueUserWorkItem with a delegate to invoke the XXXAsync methods? Is that safe? I read somewhere, that the ThreadPool won't stop idle threads which have any IOCP. That would be good as it solves the problem above.
    But it could also be bad with many TCP connections.
    In such a case is is likely that each ThreadPool thread invoked one of the pending ReceiveAsync calls. Therefore the ThreadPool would never shrink even if the current workload is low and many Threads are idle (and wasting memory).

  • Starting a dedicated thread which is always alive and invokes the XXXAsync methods. For example, when a library user want to send data, it places a delegate into a syncronized queue, the thread pops it and invokes the SendAsync method.
    I don't like this solution much, because it wastes a thread and on a multi core machine, sending is only performed by one thread.

Moreover both solutions don't seam the best because they pass the job to call the asyncronous method to another thread. Can that be avoided?

What do you think? (Thank you!!)

Thomas

EDIT 1:

I can reproduce the SocketError.OperationAborted problem with the following test program (I think it is correct).
Compile, start and telnet to port 127.0.0.1:10000. Send a "t" or a "T" and wait for > 3 seconds. When sending "T", the ReceiveAsync call is done in the ThreadPool (works), with "t" a new thread is started which terminates after 3 seconds (fails).

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Test {
    class Program {
        static List<Socket> _referenceToSockets = new List<Socket>();
        static void Main(string[] args) {
            Thread.CurrentThread.Name = "Main Thread";

            // Create a listening socket on Port 10000.
            Socket ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            _referenceToSockets.Add(ServerSocket);
            var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 10000);
            ServerSocket.Bind(endPoint);
            ServerSocket.Listen(50);

            // Start listening.
            var saeaAccept = new SocketAsyncEventArgs();
            saeaAccept.Completed += OnCompleted;
            ServerSocket.AcceptAsync(saeaAccept);

            Console.WriteLine(String.Format("Listening on {0}.", endPoint));
            Console.ReadLine();
        }

        private static void OnCompleted(object obj, SocketAsyncEventArgs evt) {
            var socket = (Socket)obj;
            Console.WriteLine(String.Format("Async operation completed: {0}; Error: {1}; Callback-Thread: \"{2}\" ({3} threadpool)", evt.LastOperation, evt.SocketError, Thread.CurrentThread.Name, Thread.CurrentThread.IsThreadPoolThread?"is":"no"));
            switch (evt.LastOperation) {
                case SocketAsyncOperation.Accept:
                    // Client connected. Listen for more.
                    Socket clientSocket = evt.AcceptSocket;
                    _referenceToSockets.Add(clientSocket);
                    evt.AcceptSocket = null;
                    socket.AcceptAsync(evt);

                    // Start receiving data.
                    var saeaReceive = new SocketAsyncEventArgs();
                    saeaReceive.Completed += OnCompleted;
                    saeaReceive.SetBuffer(new byte[1024], 0, 1024);
                    clientSocket.ReceiveAsync(saeaReceive);
                    break;
                case SocketAsyncOperation.Disconnect:
                    socket.Close();
                    evt.Dispose();
                    break;
                case SocketAsyncOperation.Receive:
                    if (evt.SocketError != SocketError.Success) {
                        socket.DisconnectAsync(evt);
                        return;
                    }
                    var asText = Encoding.ASCII.GetString(evt.Buffer, evt.Offset, evt.BytesTransferred);
                    Console.WriteLine(String.Format("Received: {0} bytes: \"{1}\"", evt.BytesTransferred, asText));
                    if (evt.BytesTransferred == 0) {
                        socket.Close();
                        evt.Dispose();
                    }
                    if (asText.ToUpper().StartsWith("T")) {
                        Action<object> action = (object o) => {
                            socket.ReceiveAsync(evt);
                            Console.WriteLine(String.Format("Called ReceiveAsync {0}...", o));
                            Thread.Sleep(3000);
                            Console.WriteLine("End of Action...");
                        };
                        if (asText.StartsWith("T")) {
                            ThreadPool.QueueUserWorkItem(o=>action(o), "in ThreadPool");
                        } else {
                            new Thread(o=>action(o)).Start("in a new Thread");
                        }
                    } else {
                        socket.ReceiveAsync(evt);
                    }
                    break;
            }
        }
    }
}

EDIT #3

The following is the solution I intend to use:

I let the thread of the library user invoke the XXXAsync (except SendAsync) operations directly. In the most cases the call will be successfull (because the calling thread does terminate only seldomly).
If the operation failed with SocketError.OperationAborted, the library just invokes the operation again using the current thread from the async callback. This one is a ThreadPool thread and it has a good chance to succeed (an additional flag will be set, to use this workaround at most once if the reason for SocketError.OperationAborted was cause due to some other error).
This should work because the socket itself is still ok, just the previous operation failed.

For the SendAsync, this workaround does not work, because it could mess up the order of the messages. In this case I will queue the messages in a FIFO list. I will use the ThreadPool to dequeue them and send them via SendAsync.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

烂柯人 2024-11-04 11:08:21

这里有一些危险信号。如果你在保持线程活动时遇到困难,那么你的线程就没有做任何有用的事情。在这种情况下,使用异步方法没有意义。如果速度是您唯一关心的问题,那么您不应该使用异步方法。它们要求 Socket 抓住一个 tp 线程来进行回调。这是很小的开销,但如果常规线程进行阻塞调用,则没有任何开销。

仅当您需要使应用程序在处理许多并发连接时能够很好地扩展时,才应考虑异步方法。这会产生非常突发的 CPU 负载,非常适合 tp 线程。

A couple of red flags here. If you've got trouble keeping the thread alive then you've got a thread that's not doing anything useful. In which case it doesn't make sense to use Async methods. If speed is your only concern then you should not use the Async methods. They require that the Socket grabs a tp thread to make the callback. That's small overhead but there is none if a regular thread makes a blocking call.

You should only consider the Async methods when you need to make your app scale well when you handle many simultaneous connections. That produces very bursty cpu loads, perfect for tp threads.

殤城〤 2024-11-04 11:08:21

调用这些方法的线程必须保持活动状态,直到操作完成”

我不相信这个说法的真实性。当您启动网络操作时,它会被移交给 Windows IO 系统。当发生网络事件(即数据已收到并且您的应用程序已收到通知),这可以很高兴地由另一个线程处理(即使启动线程早已死亡)

您确定不会删除对重要内容的引用并允许它被删除吗?垃圾收集了吗?

如果如果您处于允许线程死亡的情况,那么我怀疑您的代码还没有准备好扩展到超高速/一流的客户端,如果您真的希望扩展,那么您对线程的使用应该是。非常严格地控制让一个线程死亡的成本(我假设另一个线程需要旋转来取代它)不是你应该允许发生太多的事情,线程池绝对是这里的方法。

the thread which calls these methods must stay alive until the operation completes"

I'm not convinced of the veractiy of this statement. When you start a network operation this is handed off to the Windows IO system. When a network event occurs (i.e. data is received and your app is notified) this can happily be dealt with by another thread (even if the initiating thread has long since died).

Are you sure you're not dropping a reference to something crucial and allowing it to be garbage collected?

If you are in a situation where threads are allowed to die then I would suspect that your code isn't ready to scale to the mega-fast/stellar # of clients. If you're really looking to scale, your use of threads should be very tightly controlled. The cost of letting a thread die (where I'm assuming another would need spinning up to replace it) isn't one that you should allow to happen too much. Thread pools are very definitely the way to go here.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文