TCP 客户端指导、重新连接功能等
我有一个主要在 mono 上运行的 TCP 客户端,我希望得到一些指导,我认为我做错了一些事情,不需要做一些事情,等等。
下面的代码是我的部分代码用来作为我的疑问的示例。
正如您所看到的,一旦调用构造函数,当我实例化 ConcurrentQueues 时,我应该让它自己实例化而不需要从构造函数启动它还是我目前正在做的方式是正确的或者这并不重要?
我目前正在运行 3 个线程,我相信我可以减少到 2 个甚至 1 个,但我对此有点不安全。
如你所见,我有:
receiveThread
用于_ReceivePackets
这个控制着所有从 roomserver 接收到的数据sendThread
用于_SendPackets
这个控制着必须发送到 roomserver 的所有内容responseThread
用于_Response
这将处理从 roomserver 排队的所有响应我相信我可以将
_SendPackets
与_ReceivePackets
合并为一个,并将其添加到我的类SendPackets
中,无论它是要发送的数据包还是一个数据包已经交付了,我担心的是,如果它有一个巨大的输入/输出,它是否仍能跟上而不把事情搞砸。我将
_Response
分开,因为它将处理每种回复类型的更多响应数据,我认为这很好,并且如果我删除它并让_Response
自行处理它,因为有些数据包不会一次性读取。我应该在多大程度上依赖
_socket.Connected
?我在部署重新连接时遇到一些问题,大多数时候当我遇到一些连接问题时,它不会触发任何错误,它只是坐在那里,端口打开,就好像它仍然连接一样,我应该如何检测我是否还活着?
所有推荐、建议或在线免费阅读材料?
旁注:这是我目前正在研究的聊天 tcp 客户端的一个非常基本的实现,用于学习。
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.Collections.Concurrent;
using log4net;
namespace Connection
{
public class Roomserver
{
private static readonly ILog logger = LogManager.GetLogger(typeof(Roomserver));
private ConcurrentQueue<byte[]> RoomserverReceivedPackets = null;
private ConcurrentQueue<SendPackets> RoomserverSendPackets = null;
private AutoResetEvent _queueNotifier = new AutoResetEvent(false);
private AutoResetEvent _sendQueueNotifier = new AutoResetEvent(false);
public static byte[] myinfo = null;
private IPAddress _server = null;
private int _port = 0;
private int _roomID = 0;
private Socket _socket;
private Status _status = Status.Disconnected;
private Thread responseThread = null;
private Thread receiveThread = null;
private Thread sendThread = null;
private EndPoint _roomServer = null;
public bool Connected
{
get { return _socket.Connected; }
}
public Status GetStatus
{
get { return _status; }
}
public Roomserver(IPAddress server, int port)
{
this._server = server;
this._port = port;
RoomserverReceivedPackets = new ConcurrentQueue<byte[]>();
RoomserverSendPackets = new ConcurrentQueue<SendPackets>();
}
public Roomserver(IPAddress server, int port, int roomID)
{
this._server = server;
this._port = port;
this._roomID = roomID;
RoomserverReceivedPackets = new ConcurrentQueue<byte[]>();
RoomserverSendPackets = new ConcurrentQueue<SendPackets>();
}
public bool Connect()
{
try
{
if (_status != Status.Disconnected)
this.Disconnect();
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint remoteEndPoint = new IPEndPoint(_server, _port);
_socket.Connect(remoteEndPoint);
_status = Status.Connect;
_roomServer = (EndPoint)remoteEndPoint;
receiveThread = new Thread(_ReceivePackets);
receiveThread.Start();
sendThread = new Thread(_SendPackets);
sendThread.Start();
responseThread = new Thread(_Response);
responseThread.Start();
return _socket.Connected;
}
catch (SocketException se)
{
logger.Error("Connect: " + se.ToString());
_status = Status.Disconnected;
return false;
}
catch (Exception ex)
{
logger.Error("Connect: " + ex.ToString());
_status = Status.Disconnected;
return false;
}
}
public bool Disconnect()
{
if (_socket.Connected)
{
_status = Status.Disconnected;
if (receiveThread != null && receiveThread.IsAlive)
{
receiveThread.Abort();
}
if (responseThread != null && responseThread.IsAlive)
{
responseThread.Abort();
}
if (sendThread != null && sendThread.IsAlive)
{
sendThread.Abort();
}
try
{
_socket.Close();
return true;
}
catch (Exception ex)
{
logger.Info("Disconnect " + ex.ToString());
_status = Status.Disconnected;
return true;
}
}
else
{
logger.Info("Not connected ...");
_status = Status.Disconnected;
return true;
}
}
public bool SendData(byte[] bytes, bool delay)
{
try
{
SendPackets data = new SendPackets()
{
Data = bytes,
Delay = delay
};
RoomserverSendPackets.Enqueue(data);
_sendQueueNotifier.Set();
return true;
}
catch (Exception ex)
{
logger.Error("SendData " + ex.ToString());
return false;
}
}
private void _SendPackets()
{
while (_socket.Connected)
{
_sendQueueNotifier.WaitOne();
while (!RoomserverSendPackets.IsEmpty)
{
SendPackets packet = null;
if (RoomserverSendPackets.TryDequeue(out packet))
{
try
{
if (packet.Delay)
{
Thread.Sleep(1000);
_socket.Send(packet.Data);
}
else
_socket.Send(packet.Data);
}
catch (SocketException soe)
{
logger.Error(soe.ToString());
}
}
}
}
}
private void _ReceivePackets()
{
bool extraData = false;
MemoryStream fullPacket = null;
int fullPacketSize = 0;
while (_socket.Connected)
{
try
{
byte[] bytes = new byte[65536];
int bytesRead = _socket.ReceiveFrom(bytes, ref _roomServer);
int packetSize = 0;
int reply = 0;
byte[] data = new byte[bytesRead];
Array.Copy(bytes, data, bytesRead);
MemoryStream bufferReceived = new MemoryStream(data, 0, data.Length);
using (var reader = new BinaryReader(bufferReceived))
{
packetSize = (int)reader.ReadInt32() + 4;
reply = (int)reader.ReadByte();
}
if (!extraData && packetSize <= bytesRead)
{
if (data.Length > 0)
{
RoomserverReceivedPackets.Enqueue(data);
_queueNotifier.Set();
}
}
else
{
if (!extraData)
{
fullPacket = new MemoryStream(new byte[packetSize], 0, packetSize);
fullPacket.Write(data, 0, data.Length);
fullPacketSize = data.Length;
extraData = true;
}
else
{
if (fullPacketSize < fullPacket.Length)
{
int left = (int)fullPacket.Length - fullPacketSize;
fullPacket.Write(data, 0, (left < data.Length) ? left : data.Length);
fullPacketSize += (left < data.Length) ? left : data.Length;
if (fullPacketSize >= fullPacket.Length)
{
extraData = false;
RoomserverReceivedPackets.Enqueue(fullPacket.ToArray());
_queueNotifier.Set();
fullPacket.Close();
}
}
}
}
}
catch (SocketException soe)
{
logger.Error("_ReceivePackets " + soe.ToString());
}
catch (Exception ex)
{
logger.Error("_ReceivePackets " + ex.ToString());
}
}
}
private void _Response()
{
while (_socket.Connected)
{
_queueNotifier.WaitOne();
while (!RoomserverReceivedPackets.IsEmpty)
{
byte[] data = null;
if (RoomserverReceivedPackets.TryDequeue(out data))
{
MemoryStream bufferReceived = new MemoryStream(data, 0, data.Length);
using (var reader = new BinaryReader(bufferReceived))
{
int packetSize = (int)reader.ReadInt32();
byte reply = reader.ReadByte();
switch (reply)
{
case 0x01: // Login request
break;
case 0x02: // Login accepted
break;
case 0x03: // Enter room
break;
case 0x04: // Members list
break;
case 0x05: // Send Chat
break;
case 0x06: // Receive Chat
break;
case 0x07: // Receive Announcement
break;
case 0x08: // Send Announcement
break;
case 0x09: // Wrong password errors
_status = Status.RoomError;
break;
case 0x10: // Send Whisper
break;
case 0x11: // Receive Whisper
break;
case 0x12: // Leave Room
break;
case 0x13: // Disconnect
break;
}
}
}
}
}
}
}
}
在我的另一堂课上:
public class SendPackets
{
public byte[] Data { get; set; }
public bool Delay { get; set; }
}
public enum Status
{
Disconnected = 0,
Connect,
EnterRequest,
RoomError,
Connected
}
I have a TCP Client that mainly runs on mono that I wish some guidance with, I think I am doing some things wrong, something not needed, etc.
The below code is part of what I use to serve as a sample for my doubts.
As you can see once the constructor is called, its when I instantiate the ConcurrentQueues, should I have it instantiated by itself not needing to initiate it from the constructor or the way I am currently doing is the correct one or it doesn't really matter ?
I currently have 3 threds running which I belive I could reduce to 2 or even one but I am a bit insecured for doing that.
As you can see I have:
receiveThread
for_ReceivePackets
This one controls all received data from the roomserversendThread
for_SendPackets
This one controls everything that must be sent to the roomserverresponseThread
for_Response
This will handles all the responses that were queued from the roomserverI belive I could merge
_SendPackets
with_ReceivePackets
as one and increase to my classSendPackets
wether it is a packet to be sent or one that was delivered, what I am afraid of is that if it has a huge in/out if it would still keep up withou messing things up.I have the
_Response
separated as it will be processing more of the response data per type of reply which I think is fine and don't think it would work out if I remove it and let the_Response
handle it by itself since some packets won't be readed in just one shot.How far should I rely myself into the
_socket.Connected
?I am having some issues to deploy a reconnect, most of the time when I have some connection issue, it doesn't trigger any errors it just sit in there with the ports open as if it still connected, how should I detect if I am still live or not ?
Over all recommendations, advices or online free reading materials ?
Side note: This is a very basic implementation of a chat tcp client for learning that I am currently working on.
using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.Collections.Concurrent;
using log4net;
namespace Connection
{
public class Roomserver
{
private static readonly ILog logger = LogManager.GetLogger(typeof(Roomserver));
private ConcurrentQueue<byte[]> RoomserverReceivedPackets = null;
private ConcurrentQueue<SendPackets> RoomserverSendPackets = null;
private AutoResetEvent _queueNotifier = new AutoResetEvent(false);
private AutoResetEvent _sendQueueNotifier = new AutoResetEvent(false);
public static byte[] myinfo = null;
private IPAddress _server = null;
private int _port = 0;
private int _roomID = 0;
private Socket _socket;
private Status _status = Status.Disconnected;
private Thread responseThread = null;
private Thread receiveThread = null;
private Thread sendThread = null;
private EndPoint _roomServer = null;
public bool Connected
{
get { return _socket.Connected; }
}
public Status GetStatus
{
get { return _status; }
}
public Roomserver(IPAddress server, int port)
{
this._server = server;
this._port = port;
RoomserverReceivedPackets = new ConcurrentQueue<byte[]>();
RoomserverSendPackets = new ConcurrentQueue<SendPackets>();
}
public Roomserver(IPAddress server, int port, int roomID)
{
this._server = server;
this._port = port;
this._roomID = roomID;
RoomserverReceivedPackets = new ConcurrentQueue<byte[]>();
RoomserverSendPackets = new ConcurrentQueue<SendPackets>();
}
public bool Connect()
{
try
{
if (_status != Status.Disconnected)
this.Disconnect();
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint remoteEndPoint = new IPEndPoint(_server, _port);
_socket.Connect(remoteEndPoint);
_status = Status.Connect;
_roomServer = (EndPoint)remoteEndPoint;
receiveThread = new Thread(_ReceivePackets);
receiveThread.Start();
sendThread = new Thread(_SendPackets);
sendThread.Start();
responseThread = new Thread(_Response);
responseThread.Start();
return _socket.Connected;
}
catch (SocketException se)
{
logger.Error("Connect: " + se.ToString());
_status = Status.Disconnected;
return false;
}
catch (Exception ex)
{
logger.Error("Connect: " + ex.ToString());
_status = Status.Disconnected;
return false;
}
}
public bool Disconnect()
{
if (_socket.Connected)
{
_status = Status.Disconnected;
if (receiveThread != null && receiveThread.IsAlive)
{
receiveThread.Abort();
}
if (responseThread != null && responseThread.IsAlive)
{
responseThread.Abort();
}
if (sendThread != null && sendThread.IsAlive)
{
sendThread.Abort();
}
try
{
_socket.Close();
return true;
}
catch (Exception ex)
{
logger.Info("Disconnect " + ex.ToString());
_status = Status.Disconnected;
return true;
}
}
else
{
logger.Info("Not connected ...");
_status = Status.Disconnected;
return true;
}
}
public bool SendData(byte[] bytes, bool delay)
{
try
{
SendPackets data = new SendPackets()
{
Data = bytes,
Delay = delay
};
RoomserverSendPackets.Enqueue(data);
_sendQueueNotifier.Set();
return true;
}
catch (Exception ex)
{
logger.Error("SendData " + ex.ToString());
return false;
}
}
private void _SendPackets()
{
while (_socket.Connected)
{
_sendQueueNotifier.WaitOne();
while (!RoomserverSendPackets.IsEmpty)
{
SendPackets packet = null;
if (RoomserverSendPackets.TryDequeue(out packet))
{
try
{
if (packet.Delay)
{
Thread.Sleep(1000);
_socket.Send(packet.Data);
}
else
_socket.Send(packet.Data);
}
catch (SocketException soe)
{
logger.Error(soe.ToString());
}
}
}
}
}
private void _ReceivePackets()
{
bool extraData = false;
MemoryStream fullPacket = null;
int fullPacketSize = 0;
while (_socket.Connected)
{
try
{
byte[] bytes = new byte[65536];
int bytesRead = _socket.ReceiveFrom(bytes, ref _roomServer);
int packetSize = 0;
int reply = 0;
byte[] data = new byte[bytesRead];
Array.Copy(bytes, data, bytesRead);
MemoryStream bufferReceived = new MemoryStream(data, 0, data.Length);
using (var reader = new BinaryReader(bufferReceived))
{
packetSize = (int)reader.ReadInt32() + 4;
reply = (int)reader.ReadByte();
}
if (!extraData && packetSize <= bytesRead)
{
if (data.Length > 0)
{
RoomserverReceivedPackets.Enqueue(data);
_queueNotifier.Set();
}
}
else
{
if (!extraData)
{
fullPacket = new MemoryStream(new byte[packetSize], 0, packetSize);
fullPacket.Write(data, 0, data.Length);
fullPacketSize = data.Length;
extraData = true;
}
else
{
if (fullPacketSize < fullPacket.Length)
{
int left = (int)fullPacket.Length - fullPacketSize;
fullPacket.Write(data, 0, (left < data.Length) ? left : data.Length);
fullPacketSize += (left < data.Length) ? left : data.Length;
if (fullPacketSize >= fullPacket.Length)
{
extraData = false;
RoomserverReceivedPackets.Enqueue(fullPacket.ToArray());
_queueNotifier.Set();
fullPacket.Close();
}
}
}
}
}
catch (SocketException soe)
{
logger.Error("_ReceivePackets " + soe.ToString());
}
catch (Exception ex)
{
logger.Error("_ReceivePackets " + ex.ToString());
}
}
}
private void _Response()
{
while (_socket.Connected)
{
_queueNotifier.WaitOne();
while (!RoomserverReceivedPackets.IsEmpty)
{
byte[] data = null;
if (RoomserverReceivedPackets.TryDequeue(out data))
{
MemoryStream bufferReceived = new MemoryStream(data, 0, data.Length);
using (var reader = new BinaryReader(bufferReceived))
{
int packetSize = (int)reader.ReadInt32();
byte reply = reader.ReadByte();
switch (reply)
{
case 0x01: // Login request
break;
case 0x02: // Login accepted
break;
case 0x03: // Enter room
break;
case 0x04: // Members list
break;
case 0x05: // Send Chat
break;
case 0x06: // Receive Chat
break;
case 0x07: // Receive Announcement
break;
case 0x08: // Send Announcement
break;
case 0x09: // Wrong password errors
_status = Status.RoomError;
break;
case 0x10: // Send Whisper
break;
case 0x11: // Receive Whisper
break;
case 0x12: // Leave Room
break;
case 0x13: // Disconnect
break;
}
}
}
}
}
}
}
}
On another classes I have:
public class SendPackets
{
public byte[] Data { get; set; }
public bool Delay { get; set; }
}
public enum Status
{
Disconnected = 0,
Connect,
EnterRequest,
RoomError,
Connected
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
Dictionary
代替 switch 语句然后回来问一个更具体的问题,如果您想要更具体的答案。
更新以回答评论
而不是:
do:
然后在您的套接字类中:
请注意,该示例远未完成,您可能希望在上下文类中发送而不仅仅是内存流。
Dictionary<int, ICommandHandler>
instead of your switch statementThen come back and ask a more specific question if you want a more specific answer.
Update to answer the comment
Instead of:
do:
And then in your socket class:
Note that the example is far from complete and you might want to send in a context class instead of just the memory stream.
Connected
属性为您提供有关最后一次操作时套接字状态的信息,因此,如果自您上次尝试读取或写入后套接字更改状态,则Connected
会给你错误的(旧的)状态。根据 文档,您应该进行零长度发送以使.NET 更新套接字状态。此
Send
操作成功将告诉您套接字是否仍处于连接状态。The
Connected
property gives you information as to the status of the socket at the last operation, so if the socket changes state since you last tried to read or write thenConnected
will give you the wrong (old) state.As per the documentation, you should make a zero length send to get .NET to update the socket status. The success of this
Send
operation will tell you whether the socket is still connected.通常,在这样的应用程序中只需要一个线程进行通信。如果您的应用程序所做的只是聊天,那么整个事情可能是单线程的。如果存在读/写操作,它会阻塞您的控制台,但您可以通过执行异步读/写调用或对阻塞操作设置超时来解决这个问题。在我看来,你对线程有点过于热心了。我给新程序员的建议是,如果您不确定是否需要多个线程,请从单线程方法开始,当您发现存在阻塞的区域或可以通过多线程提高性能的区域时,然后切换。不要预先这样做。
我看到您使用 ReceiveFrom,它适用于无连接协议。尝试使用基本接收代替。您应该指定要接收的字节数,否则您将面临接收缓冲区溢出的风险。在 C# 中,这通过 SocketException 来体现,您必须深入研究 WinSock 2 API 才能找出错误代码是什么。最好只指定接收的最大大小并将接收放入循环中。
我会回应另一位响应者所说的——使用单一责任原则。设计一个只有一项工作的类。对于您的设计,我将从一个类开始,该类为您的应用程序在更高级别封装套接字通信。然后我将该类派生为服务器类,也许还有客户端类。然后您可以在“RoomServer”和“RoomClient”类中使用这些类。这种关注点分离应该迫使您将每个对象建模为现实世界的对象 - 说话者和听众,它让您思考每个对象需要什么,并且需要从类中删除与该类的主要工作无关的无关成员变量并找到一个更好的家。
Typically, only one thread is needed for communication in an application such as this. If all your application does is chat, then the entire thing could be single threaded. It would block your console if there is a read/write operation, but you could get around that by doing async read/write calls or by putting timeouts on blocking operations. You got a little overzealous with the threading in my opinion. Advice I give to new programmers is If you aren't sure whether you need multiple threads, start with a single threaded approach, and when you find areas where there is blocking or areas where performance could be improved with multithreading, then switch. Don't do it up front.
I see you use ReceiveFrom, which is intended for connectionless protocols. Try using basic receive instead. You should specify the number of bytes you want to receive, otherwise you risk overflowing the receive buffer. In C# this manifests itself with SocketException, and you would have to go dig through WinSock 2 API to figure out what the error code is. Much better to just specify a max size to receive and put the receive in a loop.
And I will echo what another responder has said - use single responsibility principle. Design a class that has one job only. For your design I would start with a class that encapsulates socket communications at a higher level for your application. Then I would derive that class into a server class and maybe a client class. Then you could use those classes in your "RoomServer" and "RoomClient" classes. This separation of concerns should force you to model each object as real world objects - talkers, and listeners, it makes you think about what each one needs, and extraneous member variables unrelated to that class's primary job need to be removed from the class and find a better home.