套接字在一定时间后受到限制

发布于 2024-12-22 18:58:17 字数 12485 浏览 0 评论 0原文

我正在用 C# 创建一个可以在任何应用程序中使用的网络库,作为该库的一部分,我有一个 TCP 客户端/服务器设置。这种设置几乎在所有情况下都能完美运行;在最小和中等压力负载下,它可以完美地连接、发送/接收数据和断开连接。但是,当我从客户端向服务器发送大量数据时,客户端套接字会工作不同的时间(有时短,有时长),然后暂时拒绝发送数据。具体来说,我的数据速率从 550-750 KBps 范围变为 0 KBps,并再次停留在该位置一段不同的时间。然后套接字将在很短的时间内再次开始发送,并再次受到“限制”。在限制期间,我假设套接字已断开连接,因为我无法发送任何内容,但轮询返回套接字已使用以下代码连接:


public bool IsConnected(Socket socket)
{
     try
     {
         return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0);
     }
     catch (SocketException) { return false; }
}

我刚刚在大学上了一门网络课程,所以我开始考虑拥塞控制TCP中的流量控制机制,但在我看来,两者都不会导致这个问题;拥塞控制只会减慢数据速率,并且接收器端的完整缓冲区不会持续我获得 0 KBps 数据速率的时间长度。该症状似乎表明存在某种类型的严重数据限制或大规模数据包丢失。

我的问题是:由于缺乏更好的术语,有谁知道可能导致此数据“节流”的原因是什么?另外,我发送的数据包是否有可能比我的路由器更远,即使它们被发送到同一子网中的主机?

编辑:很清楚,我试图解决这个问题的原因问题是因为我想通过 TCP 以尽可能高的数据速率发送文件。我知道也可以使用 UDP,我也将使用它制定一个解决方案,但我希望 TCP 首先工作。

具体信息:

我正在使用阻塞读/写操作,并且服务器是多线程的。客户端也运行在自己的线程上。我正在本地子网上进行测试,通过路由器反弹所有数据包,该路由器的吞吐量应为 54 Mbps。每个数据包大小为 8 KB,每秒最多发送 1000 次(发送线程休眠 1 毫秒),但显然没有达到该速率。减小数据包的大小以降低数据速率会导致限制消失。 Windows 7 机器,1 个服务器,1 个客户端。发送操作总是完成,错误的是接收操作。

发送操作如下:


//get a copy of all the packets currently in the queue
                    IPacket[] toSend;
                    lock (packetQueues[c])
                    {
                        if (packetQueues[c].Count > SEND_MAX)
                        {
                            toSend = packetQueues[c].GetRange(0, SEND_MAX).ToArray();
                            packetQueues[c].RemoveRange(0, SEND_MAX);
                        }
                        else
                        {
                            toSend = packetQueues[c].ToArray();
                            packetQueues[c].RemoveRange(0, toSend.Length);
                        }
                    }
                    if (toSend != null && toSend.Length > 0)
                    { //write the packets to the network stream
                        try
                        {
                            writer.Write(toSend.Length);
                        }
                        catch (Exception e)
                        {
                            Logger.Log(e);
                            if (showErrorMessages)
                                MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                        }
                        for (int i = 0; i < toSend.Length; i++)
                        {
                            try
                            {
                                toSend[i].Write(writer);
                                if (onSend != null)
                                {
                                    object[] args = new object[2];
                                    args[0] = c;
                                    args[1] = toSend[i];
                                    onSend(args);
                                }
                            }
                            catch (Exception e)
                            {
                                Logger.Log(e);
                                if (showErrorMessages)
                                    MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                            }
                        }
                    }

这是接收代码:


try
                    { 
                        //default buffer size of a TcpClient is 8192 bytes, or 2048 characters
                        if (client.Available > 0)
                        {
                            int numPackets = reader.ReadInt32();
                            for (int i = 0; i < numPackets; i++)
                            {
                                readPacket.Clear();
                                readPacket.Read(reader);
                                if (owner != null)
                                {
                                    owner.AcceptPacket(readPacket, c); //application handles null packets itself.
                                    if (onReceive != null)
                                    {
                                        object[] args = new object[2];
                                        args[0] = c;
                                        args[1] = readPacket;
                                        onReceive(args);
                                    }
                                }
                            }
                            timestamps[c] = TimeManager.GetCurrentMilliseconds();
                        }
                        else
                        {
                            double now = TimeManager.GetCurrentMilliseconds();
                            if (now - timestamps[c] >= timeToDisconnect)
                            { //if timestamp is old enough, check for connection.
                                connected[c] = IsConnected(client.Client);
                                if (!connected[c])
                                {
                                    netStream.Close();
                                    clients[c].Close();
                                    numConnections--;
                                    if (onTimeout != null) onTimeout(c);
                                }
                                else
                                {
                                    timestamps[c] = now;
                                }
                            }
                        }

                    }
                    catch (Exception s)
                    {
                        Logger.Log(s);
                        if (showErrorMessages)
                            MessageBox.Show("Client " + (int)c + ": " + s, "Error", MessageBoxButtons.OK);
                    }

数据包发送/接收:


public void Write(BinaryWriter w)
        {
            w.Write(command); //byte
            w.Write(data.Type); //short
            w.Write(data.Data.Length); //int
            w.Write(data.Data); //byte array
            w.Flush();
        }

        /// <summary>
        /// Reads a command packet from data off a network stream.
        /// </summary>
        /// <param name="r">The stream reader.</param>
        public void Read(BinaryReader r)
        {
            command = r.ReadByte();
            short dataType = r.ReadInt16();
            int dataSize = r.ReadInt32();
            byte[] bytes = r.ReadBytes(dataSize);
            data = new PortableObject(dataType, bytes);
        } 

完整服务器通信循环:


public void Communicate(object cl)
        {
            int c = (int)cl;
            timestamps[c] = TimeManager.GetCurrentMilliseconds();
            try
            {
                //Console.Out.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " has started up. c = " + (int)c);

                TcpClient client = clients[c];
                client.ReceiveTimeout = 100;

                NetworkStream netStream = client.GetStream();
                BinaryReader reader = new BinaryReader(netStream);
                BinaryWriter writer = new BinaryWriter(netStream);

                while (client != null && connected[c])
                {
                    #region Receive
                    try
                    { 
                        //default buffer size of a TcpClient is 8192 bytes, or 2048 characters
                        if (client.Available > 0)
                        {
                            int numPackets = reader.ReadInt32();
                            for (int i = 0; i < numPackets; i++)
                            {
                                readPacket.Clear();
                                readPacket.Read(reader);
                                if (owner != null)
                                {
                                    owner.AcceptPacket(readPacket, c); //application handles null packets itself.
                                    if (onReceive != null)
                                    {
                                        object[] args = new object[2];
                                        args[0] = c;
                                        args[1] = readPacket;
                                        onReceive(args);
                                    }
                                }
                            }
                            timestamps[c] = TimeManager.GetCurrentMilliseconds();
                        }
                        else
                        {
                            double now = TimeManager.GetCurrentMilliseconds();
                            if (now - timestamps[c] >= timeToDisconnect)
                            { //if timestamp is old enough, check for connection.
                                connected[c] = IsConnected(client.Client);
                                if (!connected[c])
                                {
                                    netStream.Close();
                                    clients[c].Close();
                                    numConnections--;
                                    if (onTimeout != null) onTimeout(c);
                                }
                                else
                                {
                                    timestamps[c] = now;
                                }
                            }
                        }

                    }
                    catch (Exception s)
                    {
                        Logger.Log(s);
                        if (showErrorMessages)
                            MessageBox.Show("Client " + (int)c + ": " + s, "Error", MessageBoxButtons.OK);
                    }
                    #endregion

                    Thread.Sleep(threadLatency);

                    #region Send
                    //get a copy of all the packets currently in the queue
                    IPacket[] toSend;
                    lock (packetQueues[c])
                    {
                        if (packetQueues[c].Count > SEND_MAX)
                        {
                            toSend = packetQueues[c].GetRange(0, SEND_MAX).ToArray();
                            packetQueues[c].RemoveRange(0, SEND_MAX);
                        }
                        else
                        {
                            toSend = packetQueues[c].ToArray();
                            packetQueues[c].RemoveRange(0, toSend.Length);
                        }
                    }
                    if (toSend != null && toSend.Length > 0)
                    { //write the packets to the network stream
                        try
                        {
                            writer.Write(toSend.Length);
                        }
                        catch (Exception e)
                        {
                            Logger.Log(e);
                            if (showErrorMessages)
                                MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                        }
                        for (int i = 0; i < toSend.Length; i++)
                        {
                            try
                            {
                                toSend[i].Write(writer);
                                if (onSend != null)
                                {
                                    object[] args = new object[2];
                                    args[0] = c;
                                    args[1] = toSend[i];
                                    onSend(args);
                                }
                            }
                            catch (Exception e)
                            {
                                Logger.Log(e);
                                if (showErrorMessages)
                                    MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                            }
                        }
                    }
                    #endregion
                }
            }
            catch (ThreadAbortException tae) 
            { 
                Logger.Log(tae); 
                MessageBox.Show("Thread " + (int)cl + " was aborted.", "Error", MessageBoxButtons.OK); 
            }
        }   

I am creating a networking library in C# that I can use in any application, and as part of this library I have a TCP client/server setup. This setup works perfectly in almost every situation; it connects, sends/receives data, and disconnects flawlessly when under minimal and medium stress loads. However, when I send large amounts of data from the client to the server, the client socket works for a varied amount of time (sometimes short, sometimes long) and then just refuses to send data for a while. Specifically, my data rate goes from the 550-750 KBps range to 0 KBps, and sits there for again a varied amount of time. Then the socket will start sending again for a very short time, and get "throttled" again. During the throttling, i was assuming that the socket was disconnected because I couldn't send anything, but Polling returns that the socket IS connected using this code:


public bool IsConnected(Socket socket)
{
     try
     {
         return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0);
     }
     catch (SocketException) { return false; }
}

I just took a networking class at my college, so I started thinking about the congestion control and flow control mechanisms in TCP, but it seems to me that neither would cause this problem; congestion control only slows the data rate, and a full buffer on the receiver's side wouldn't last nearly the length of time I am getting a 0 KBps data rate. The symptom seems to point towards either some type of heavy data throttling or mass scale dropping of packets.

My question is this: does anyone have any idea what might be causing this data "throttling", for lack of a better term? Also, is it possible that the packets I send are going further than just my router even though they are addressed to a host in the same subnet?

Edit: Just so it is clear, the reason I am trying to fix this problem is because I want to send files over TCP at the highest possible data rate. I understand that UDP can be used as well, and I will also be making a solution using it, but I want TCP to work first.

Specific Information:

I am using blocking read/write operations, and the server is multi-threaded. The client runs on its own thread as well. I am testing on my local subnet, bouncing all packets through my router, which should have a throughput of 54 Mbps. The packets are 8 KB each in size, and at maximum would be sent 1000 times a second (sending thread sleeps 1 ms), but obviously are not reaching that rate. Reducing the size of the packets so the data rate is lower causes the throttling to disappear. Windows 7 machines, 1 server, 1 client. The send operation always completes, it is the receive operation that errors out.

The send operation is below:


//get a copy of all the packets currently in the queue
                    IPacket[] toSend;
                    lock (packetQueues[c])
                    {
                        if (packetQueues[c].Count > SEND_MAX)
                        {
                            toSend = packetQueues[c].GetRange(0, SEND_MAX).ToArray();
                            packetQueues[c].RemoveRange(0, SEND_MAX);
                        }
                        else
                        {
                            toSend = packetQueues[c].ToArray();
                            packetQueues[c].RemoveRange(0, toSend.Length);
                        }
                    }
                    if (toSend != null && toSend.Length > 0)
                    { //write the packets to the network stream
                        try
                        {
                            writer.Write(toSend.Length);
                        }
                        catch (Exception e)
                        {
                            Logger.Log(e);
                            if (showErrorMessages)
                                MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                        }
                        for (int i = 0; i < toSend.Length; i++)
                        {
                            try
                            {
                                toSend[i].Write(writer);
                                if (onSend != null)
                                {
                                    object[] args = new object[2];
                                    args[0] = c;
                                    args[1] = toSend[i];
                                    onSend(args);
                                }
                            }
                            catch (Exception e)
                            {
                                Logger.Log(e);
                                if (showErrorMessages)
                                    MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                            }
                        }
                    }

And this is the receive code:


try
                    { 
                        //default buffer size of a TcpClient is 8192 bytes, or 2048 characters
                        if (client.Available > 0)
                        {
                            int numPackets = reader.ReadInt32();
                            for (int i = 0; i < numPackets; i++)
                            {
                                readPacket.Clear();
                                readPacket.Read(reader);
                                if (owner != null)
                                {
                                    owner.AcceptPacket(readPacket, c); //application handles null packets itself.
                                    if (onReceive != null)
                                    {
                                        object[] args = new object[2];
                                        args[0] = c;
                                        args[1] = readPacket;
                                        onReceive(args);
                                    }
                                }
                            }
                            timestamps[c] = TimeManager.GetCurrentMilliseconds();
                        }
                        else
                        {
                            double now = TimeManager.GetCurrentMilliseconds();
                            if (now - timestamps[c] >= timeToDisconnect)
                            { //if timestamp is old enough, check for connection.
                                connected[c] = IsConnected(client.Client);
                                if (!connected[c])
                                {
                                    netStream.Close();
                                    clients[c].Close();
                                    numConnections--;
                                    if (onTimeout != null) onTimeout(c);
                                }
                                else
                                {
                                    timestamps[c] = now;
                                }
                            }
                        }

                    }
                    catch (Exception s)
                    {
                        Logger.Log(s);
                        if (showErrorMessages)
                            MessageBox.Show("Client " + (int)c + ": " + s, "Error", MessageBoxButtons.OK);
                    }

Packet send/receive:


public void Write(BinaryWriter w)
        {
            w.Write(command); //byte
            w.Write(data.Type); //short
            w.Write(data.Data.Length); //int
            w.Write(data.Data); //byte array
            w.Flush();
        }

        /// <summary>
        /// Reads a command packet from data off a network stream.
        /// </summary>
        /// <param name="r">The stream reader.</param>
        public void Read(BinaryReader r)
        {
            command = r.ReadByte();
            short dataType = r.ReadInt16();
            int dataSize = r.ReadInt32();
            byte[] bytes = r.ReadBytes(dataSize);
            data = new PortableObject(dataType, bytes);
        } 

Full Server Communication Loop:


public void Communicate(object cl)
        {
            int c = (int)cl;
            timestamps[c] = TimeManager.GetCurrentMilliseconds();
            try
            {
                //Console.Out.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " has started up. c = " + (int)c);

                TcpClient client = clients[c];
                client.ReceiveTimeout = 100;

                NetworkStream netStream = client.GetStream();
                BinaryReader reader = new BinaryReader(netStream);
                BinaryWriter writer = new BinaryWriter(netStream);

                while (client != null && connected[c])
                {
                    #region Receive
                    try
                    { 
                        //default buffer size of a TcpClient is 8192 bytes, or 2048 characters
                        if (client.Available > 0)
                        {
                            int numPackets = reader.ReadInt32();
                            for (int i = 0; i < numPackets; i++)
                            {
                                readPacket.Clear();
                                readPacket.Read(reader);
                                if (owner != null)
                                {
                                    owner.AcceptPacket(readPacket, c); //application handles null packets itself.
                                    if (onReceive != null)
                                    {
                                        object[] args = new object[2];
                                        args[0] = c;
                                        args[1] = readPacket;
                                        onReceive(args);
                                    }
                                }
                            }
                            timestamps[c] = TimeManager.GetCurrentMilliseconds();
                        }
                        else
                        {
                            double now = TimeManager.GetCurrentMilliseconds();
                            if (now - timestamps[c] >= timeToDisconnect)
                            { //if timestamp is old enough, check for connection.
                                connected[c] = IsConnected(client.Client);
                                if (!connected[c])
                                {
                                    netStream.Close();
                                    clients[c].Close();
                                    numConnections--;
                                    if (onTimeout != null) onTimeout(c);
                                }
                                else
                                {
                                    timestamps[c] = now;
                                }
                            }
                        }

                    }
                    catch (Exception s)
                    {
                        Logger.Log(s);
                        if (showErrorMessages)
                            MessageBox.Show("Client " + (int)c + ": " + s, "Error", MessageBoxButtons.OK);
                    }
                    #endregion

                    Thread.Sleep(threadLatency);

                    #region Send
                    //get a copy of all the packets currently in the queue
                    IPacket[] toSend;
                    lock (packetQueues[c])
                    {
                        if (packetQueues[c].Count > SEND_MAX)
                        {
                            toSend = packetQueues[c].GetRange(0, SEND_MAX).ToArray();
                            packetQueues[c].RemoveRange(0, SEND_MAX);
                        }
                        else
                        {
                            toSend = packetQueues[c].ToArray();
                            packetQueues[c].RemoveRange(0, toSend.Length);
                        }
                    }
                    if (toSend != null && toSend.Length > 0)
                    { //write the packets to the network stream
                        try
                        {
                            writer.Write(toSend.Length);
                        }
                        catch (Exception e)
                        {
                            Logger.Log(e);
                            if (showErrorMessages)
                                MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                        }
                        for (int i = 0; i < toSend.Length; i++)
                        {
                            try
                            {
                                toSend[i].Write(writer);
                                if (onSend != null)
                                {
                                    object[] args = new object[2];
                                    args[0] = c;
                                    args[1] = toSend[i];
                                    onSend(args);
                                }
                            }
                            catch (Exception e)
                            {
                                Logger.Log(e);
                                if (showErrorMessages)
                                    MessageBox.Show("Client " + (int)c + ": " + e, "Error", MessageBoxButtons.OK);
                            }
                        }
                    }
                    #endregion
                }
            }
            catch (ThreadAbortException tae) 
            { 
                Logger.Log(tae); 
                MessageBox.Show("Thread " + (int)cl + " was aborted.", "Error", MessageBoxButtons.OK); 
            }
        }   

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

属性 2024-12-29 18:58:17

这可能是您的代码,但我们很难说,因为它不完整。

我在 .NET TCP/ 中编写了自己的一组最佳实践IP 常见问题解答 - 经过多年的 TCP/IP 经验。我建议你从那开始。

PS 我保留术语“数据包”来表示在线数据包。 TCP 应用程序无法控制数据包。我使用术语“消息”来表示应用程序协议级消息。我认为这可以减少混乱,尤其是对于新手来说。

It is probably your code, but it's difficult for us to say as it's incomplete.

I wrote up my own set of best practices in a .NET TCP/IP FAQ - after many, many years of TCP/IP experience. I recommend you start with that.

P.S. I reserve the term "packet" for packets on-the-wire. A TCP app has no control over packets. I use the term "message" for application-protocol-level messages. I think this reduces confusion, especially for newcomers.

束缚m 2024-12-29 18:58:17

如果您正在尝试创建一个

我可以在任何应用程序中使用的 C# 网络库

您知道现有的开源库吗? networkComms.net 可能是一个好的开始。如果你能重现同样的问题,我会感到非常惊讶。我个人用它来维护超过 1000 个并发连接,每个连接每秒发送大约 10 个数据包。否则,如果您想继续使用您的代码,也许查看 networkComms.net 的 可以指出你可能出错的地方。

If you are trying to create a

networking library in C# that I can use in any application

were you aware of any existing open source libraries out there? networkComms.net is possibly a good start. If you can recreate the same problem with that i'd be very surprised. I've personally used it to maintain over 1000 concurrent connections each sending about 10 packets a second. Otherwise if you want to keep using your code perhaps looking at the source of networkComms.net can point out where you might be going wrong.

把时间冻结 2024-12-29 18:58:17

没有仔细查看您的代码片段,但我看到您在那里进行了分配 - 您是否检查过您对垃圾收集器施加的压力?

PS:(发送线程睡眠 1 毫秒) - 请记住,没有 timeBeginPeriod() 的 Sleep() 不会获得 1 毫秒的分辨率 - 可能接近 10-20 毫秒,具体取决于 Windows 版本和硬件。

Didn't look closely at your code snippets, but I see you have allocation in there - have you checked what pressure you're putting on the garbage collector?

PS: (sending thread sleeps 1 ms) - keep in mind that Sleep() without timeBeginPeriod() isn't going to get your 1ms resolution - probably closer to 10-20ms depending on Windows version and hardware.

双马尾 2024-12-29 18:58:17

不太了解 C# 并且此代码不完整。如果我做对了,那么

readPacket.Read(reader); 

将读取任何可用的内容,并且您的接收器端 for 循环将被击倒。您在哪里检查读取的字节数?

无论如何,检查 TCP 级别及更低级别发生的情况的一个好方法是 wireshark

Don't really know about C# and this code is incomplete. If I get it right, then

readPacket.Read(reader); 

will read whatever is available, and your receiver end for loop will be knocked over. Where are you checking the read amount of bytes ?

Anyway, a good way to check on what's happening at TCP level and lower is wireshark

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文