c#client-server插座线程中的通信

发布于 2025-02-09 05:01:45 字数 22126 浏览 3 评论 0原文

我创建了这些应用程序。一个是服务器套接字,由负责创建客户端/服务器连接和创建其他线程的主线程组成:

  • 消息的侦听消息线程
  • 为了从客户端
  • 线程收到

,如果有连接中断的服务器释放任何资源和等待循环中的新连接。

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;

public class SynchronousSocketListener {

    private Socket handler; //socket handlet
    private byte[] bytes; // Data buffer for incoming data.  
    private List < String > messagesToSend;
    private Mutex messagesToSendMutex;

    public SynchronousSocketListener() {
        messagesToSendMutex = new Mutex();

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend = new List < String > ();
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        bytes = new Byte[1024];
        handler = null;
    }

    private void addMessageToQueue(string message) {

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend.Add(message);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private string readMessageFromQueue() {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                ret = messagesToSend[0];
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        return ret;
    }

    private void removeMessageFromQueue(string messageToRemove) {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                messagesToSend.Remove(messageToRemove);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private void threadForGeneratingMessages() {
        while (true) {
            addMessageToQueue(Console.ReadLine());
        }
    }

    private void sendingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task sending is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            if (messagesToSend.Count <= 0) {
                Console.WriteLine("No more messages to send");
                Thread.Sleep(5000);
                continue;
            }

            Console.WriteLine("Server is going to write some data for client");

            //send pending messages to client

            string dataToSendNow = messagesToSend[0];
            byte[] msg = Encoding.ASCII.GetBytes(dataToSendNow + "<EOF>");

            try {
                // Send the data through the socket.  

                int bytesSent = handler.Send(msg);
                messagesToSend.Remove(dataToSendNow);

                Console.WriteLine("Server send data");
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());

                Console.WriteLine("returning from sendingThread sockEx");
                return;

            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                Console.WriteLine("returning from sendingThread objDisEx");
                return;
            }

            Thread.Sleep(100);
        }
    }

    private void receivingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task receiving is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            Console.WriteLine("Server is waiting for a new message to arrive: ");
            try {
                if (!handler.Connected) throw new SocketException();
                //handler.Send(new byte[] { 0 });
                int bytesRec = handler.Receive(bytes);
                string receivedData = Encoding.ASCII.GetString(bytes, 0, bytesRec);

                Console.WriteLine("Server has received message = {0}",
                    receivedData);

                /*
                //do some stuff with the message
                .
                .
                .
                */

            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            }
        }
    }

    public void StartListening() {
        // Establish the local endpoint for the socket.  
        // Dns.GetHostName returns the name of the
        // host running the application.  
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[0];

        while (true) {

            // Create a TCP/IP socket.  
            // Bind the socket to the local endpoint and
            // listen for incoming connections. 

            Socket listener = new Socket(ipAddress.AddressFamily,
                SocketType.Stream, ProtocolType.Tcp);
            Console.WriteLine("New connection can be made");

            try {
                IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 11100);
                listener.Bind(localEndPoint);
                listener.Listen(10);

                // Start listening for connections.  

                Console.WriteLine("Waiting for a connection...");
                // Program is suspended while waiting for an incoming connection.  
                handler = listener.Accept();
                Console.WriteLine("New connection is made");

                var tokenSource2 = new CancellationTokenSource();
                CancellationToken ct = tokenSource2.Token;

                //when new connection is made create new thread for sending data through socket
                var task_send = Task.Run(() => {
                    sendingThread(ct);

                }, tokenSource2.Token);

                var tokenSource3 = new CancellationTokenSource();
                CancellationToken ct3 = tokenSource3.Token;

                //when new connection is made create new thread for receiving data through socket
                var task = Task.Run(() => {
                    receivingThread(ct3);
                }, tokenSource3.Token);

                while (true) {
                    if (task.IsCompleted || task_send.IsCompleted) { //activelly oolling the field to find out, wether threads has been cancelled/returned
                        Console.WriteLine("some tasks is Completed");
                        tokenSource2.Cancel();
                        tokenSource3.Cancel();
                        handler.Shutdown(SocketShutdown.Both);
                        handler.Close();
                        break; //breaking from polling loop to prepare for a new connection
                    }
                }

            } catch (SocketException se) {
                Console.WriteLine(se.ToString());
            } catch (Exception e) {
                Console.WriteLine(e.ToString());
            }

            listener.Dispose(); //disposing listener so that new one can be created
        }

        Console.WriteLine("\nPress ENTER to continue...");
        Console.Read();

    }

    public static int Main(String[] args) {
        SynchronousSocketListener synSocList = new SynchronousSocketListener();
        var task_generateMsg = Task.Run(
            (synSocList.threadForGeneratingMessages));

        synSocList.StartListening();

        return 0;
    }
}

客户端的应用结构与服务器基本相同,并且如果连接已中断,它正在积极尝试连接到服务器:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;

public class SynchronousSocketClient {
    private Socket sender; //socket
    private byte[] bytes; // Data buffer for incoming data.  
    private List < String > messagesToSend;
    private Mutex messagesToSendMutex;

    public SynchronousSocketClient() {
        messagesToSendMutex = new Mutex();

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend = new List < String > ();
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        bytes = new Byte[1024];
        sender = null;
    }

    private void addMessageToQueue(string message) {

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend.Add(message);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private string readMessageFromQueue() {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                ret = messagesToSend[0];
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        return ret;
    }

    private void removeMessageFromQueue(string messageToRemove) {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                messagesToSend.Remove(messageToRemove);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private void threadForGeneratingMessages() {
        while (true) {
            addMessageToQueue(Console.ReadLine());
        }
    }

    private void sendingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task sending is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            if (messagesToSend.Count <= 0) {
                Console.WriteLine("No more messages to send");
                Thread.Sleep(5000);
                continue;
            }

            Console.WriteLine("Client is going to write some data for client");

            //send pending messages to client

            string dataToSendNow = messagesToSend[0];
            byte[] msg = Encoding.ASCII.GetBytes(dataToSendNow + "<EOF>");

            try {
                // Send the data through the socket.  

                int bytesSent = sender.Send(msg);
                messagesToSend.Remove(dataToSendNow);

                Console.WriteLine("Client send data");
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());

                Console.WriteLine("returning from sendingThread sockEx");
                return;

            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                Console.WriteLine("returning from sendingThread objDisEx");
                return;
            }

        }
    }

    private void receivingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task receiving is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            Console.WriteLine("Client is waiting for a new message to arrive: ");
            try {
                if (!sender.Connected) throw new SocketException();
                //sender.Send(new byte[] { 0 });
                int bytesRec = sender.Receive(bytes);
                string receivedData = Encoding.ASCII.GetString(bytes, 0, bytesRec);

                Console.WriteLine("Client has received message = {0}",
                    receivedData);

                /*
                //do some stuff with the message
                .
                .
                .
                */

            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            }
        }
    }

    private void sendingThread() {
        while (true) {
            Console.WriteLine("Write some data for server");

            string line = Console.ReadLine();
            byte[] msg = Encoding.ASCII.GetBytes(line + "<EOF>");

            try {
                // Send the data through the socket.  
                int bytesSent = sender.Send(msg);

                Console.WriteLine("data are send");
                // Receive the response from the remote device. 
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                socketError = false;
                return;

            }

            Thread.Sleep(100);
        }
    }

    volatile private bool socketError = false;

    private void receivingThread() {

        while (true) {
            Console.WriteLine("receiving:");
            try {

                if (!sender.Connected) throw new SocketException();
                sender.Send(new byte[] {
                    0
                });
                int bytesRec = sender.Receive(bytes);
                Console.WriteLine("Echoed test = {0}",
                    Encoding.ASCII.GetString(bytes, 0, bytesRec));
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                socketError = true;
                return;
            }
        }
    }

    public void StartClient() {
        // Data buffer for incoming data.  

        while (true) {
            // Connect to a remote device.  

            try {
                // Establish the remote endpoint for the socket.  
                // This example uses port 11000 on the local computer.  
                IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
                IPAddress ipAddress = ipHostInfo.AddressList[0];
                IPEndPoint remoteEP = new IPEndPoint(ipAddress, 11100);

                // Create a TCP/IP  socket.  
                sender = new Socket(ipAddress.AddressFamily,
                    SocketType.Stream, ProtocolType.Tcp);

                // Connect the socket to the remote endpoint. Catch any errors.  
                try {

                    sender.Connect(remoteEP);

                    Console.WriteLine("Client has made connection to the server");

                    /*
                    try
                    {
                        task_send.Wait();
                    }
                    catch (AggregateException ae)
                    {
                        Console.WriteLine(ae.ToString());
                        foreach (var ex in ae.InnerExceptions)
                        {
                            throw ex;
                        }
                    }
                    */

                    var tokenSource2 = new CancellationTokenSource();
                    CancellationToken ct = tokenSource2.Token;

                    //when new connection is made create new thread for sending data through socket
                    var task_send = Task.Run(() => {
                        sendingThread(ct);

                    }, tokenSource2.Token);

                    var tokenSource3 = new CancellationTokenSource();
                    CancellationToken ct3 = tokenSource3.Token;

                    //when new connection is made create new thread for receiving data through socket
                    var task = Task.Run(() => {
                        receivingThread(ct3);
                    }, tokenSource3.Token);

                    while (true) {
                        if (task.IsCompleted || task_send.IsCompleted) { //activelly oolling the field to find out, wether threads has been cancelled/returned
                            Console.WriteLine("some tasks is Completed");
                            tokenSource2.Cancel();
                            tokenSource3.Cancel();
                            sender.Shutdown(SocketShutdown.Both);
                            sender.Close();
                            break; //breaking from polling loop to prepare for a new connection
                        }
                    }

                    /*
                    var task_send = Task.Run(
                        (sendingThread));

                    var task = Task.Run(
                        (receivingThread));
                    
                    */
                    /*
                    try
                    {
                        task.Wait();
                    }
                    catch (AggregateException ae)
                    {
                        foreach (var ex in ae.InnerExceptions)
                        {
                            throw ex;
                        }
                    }*/

                    /*

                    bool task_receiving_res = false;
                    //int timeout = 60000;
                    Task<bool> task_receiving = new Task<bool>(() => receivingThread());
                    task_receiving.Start();
                    Console.WriteLine("here0");

                    if (await Task.WhenAny(task_receiving, Task.Delay(Timeout.Infinite)) == task_receiving)
                    {
                        Console.WriteLine("here-0");
                        // task completed within timeout
                        task_receiving_res = task_receiving.Result;
                    }
                    else
                    {
                        throw new TimeoutException();
                    }

                    Console.WriteLine("here");

                    bool task_sending_res = false;
                    //int timeout = 60000;
                    Task<bool> task_sending = new Task<bool>(() => sendingThread());
                    task_sending.Start();
                    if (await Task.WhenAny(task_sending, Task.Delay(Timeout.Infinite)) == task_sending)
                    {
                        // task completed within timeout
                        task_sending_res = task_sending.Result;
                    }
                    else
                    {
                        throw new TimeoutException();
                    }
                    */
                    /*
                    Console.WriteLine("here1");

                    while (true) {
                        if (socketError) {
                            /*task.Dispose();
                            task_send.Dispose();
                            
                            */
                    /*
                    socketError = false;
                            throw new Exception("restart connection");
                        }
                        
                    }*/

                    /*
                    while (true)
                    {
                        */
                    /*Console.WriteLine("Socket connected to {0}",
                        sender.RemoteEndPoint.ToString());*/
                    /*
                    // Encode the data string into a byte array.  
                    byte[] msg = Encoding.ASCII.GetBytes("This is a test<EOF>");

                    // Send the data through the socket.  
                    int bytesSent = sender.Send(msg);
                    */
                    // Receive the response from the remote device.  
                    /*int bytesRec = sender.Receive(bytes);
                    Console.WriteLine("Echoed test = {0}",
                        Encoding.ASCII.GetString(bytes, 0, bytesRec));
                    */
                    /*
                    bytesRec = sender.Receive(bytes);
                    Console.WriteLine("Echoed test = {0}",
                        Encoding.ASCII.GetString(bytes, 0, bytesRec));
                    */
                    /*

                    Thread.Sleep(5000);


                }*/

                    // Release the socket.  
                    sender.Shutdown(SocketShutdown.Both);
                    sender.Close();

                } catch (ArgumentNullException ane) {
                    Console.WriteLine("ArgumentNullException : {0}", ane.ToString());
                } catch (SocketException se) {
                    Console.WriteLine("SocketException : {0}", se.ToString());
                } catch (Exception e) {
                    Console.WriteLine("Unexpected exception : {0}", e.ToString());
                }

            } catch (Exception e) {
                Console.WriteLine(e.ToString());
            }

            Thread.Sleep(5000);
        }

    }

    public static int Main(String[] args) {
        SynchronousSocketClient sc = new SynchronousSocketClient();

        var task_generateMsg = Task.Run(
            (sc.threadForGeneratingMessages));

        sc.StartClient();
        return 0;
    }
}

如何编写这些代码会更有效?这是安全的,还是这些代码(效率除外)是否有问题?


编辑: 我对代码有问题: 有时,接收线是在循环的,而无需停止在处理程序/sender.receive()函数,即使其他侧没有发送任何内容,它似乎也读取0字节。如何纠正?

I have created these applications. One is server socket which consists of main thread responsible for creating client/server connection and creating other threads:

  • thread for producing messages
  • thread for listening for received messages from client
  • thread to send produced messages by server

If there is connection disrupted server frees any resources and waits for new connections in a loop.

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;

public class SynchronousSocketListener {

    private Socket handler; //socket handlet
    private byte[] bytes; // Data buffer for incoming data.  
    private List < String > messagesToSend;
    private Mutex messagesToSendMutex;

    public SynchronousSocketListener() {
        messagesToSendMutex = new Mutex();

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend = new List < String > ();
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        bytes = new Byte[1024];
        handler = null;
    }

    private void addMessageToQueue(string message) {

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend.Add(message);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private string readMessageFromQueue() {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                ret = messagesToSend[0];
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        return ret;
    }

    private void removeMessageFromQueue(string messageToRemove) {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                messagesToSend.Remove(messageToRemove);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private void threadForGeneratingMessages() {
        while (true) {
            addMessageToQueue(Console.ReadLine());
        }
    }

    private void sendingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task sending is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            if (messagesToSend.Count <= 0) {
                Console.WriteLine("No more messages to send");
                Thread.Sleep(5000);
                continue;
            }

            Console.WriteLine("Server is going to write some data for client");

            //send pending messages to client

            string dataToSendNow = messagesToSend[0];
            byte[] msg = Encoding.ASCII.GetBytes(dataToSendNow + "<EOF>");

            try {
                // Send the data through the socket.  

                int bytesSent = handler.Send(msg);
                messagesToSend.Remove(dataToSendNow);

                Console.WriteLine("Server send data");
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());

                Console.WriteLine("returning from sendingThread sockEx");
                return;

            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                Console.WriteLine("returning from sendingThread objDisEx");
                return;
            }

            Thread.Sleep(100);
        }
    }

    private void receivingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task receiving is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            Console.WriteLine("Server is waiting for a new message to arrive: ");
            try {
                if (!handler.Connected) throw new SocketException();
                //handler.Send(new byte[] { 0 });
                int bytesRec = handler.Receive(bytes);
                string receivedData = Encoding.ASCII.GetString(bytes, 0, bytesRec);

                Console.WriteLine("Server has received message = {0}",
                    receivedData);

                /*
                //do some stuff with the message
                .
                .
                .
                */

            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            }
        }
    }

    public void StartListening() {
        // Establish the local endpoint for the socket.  
        // Dns.GetHostName returns the name of the
        // host running the application.  
        IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
        IPAddress ipAddress = ipHostInfo.AddressList[0];

        while (true) {

            // Create a TCP/IP socket.  
            // Bind the socket to the local endpoint and
            // listen for incoming connections. 

            Socket listener = new Socket(ipAddress.AddressFamily,
                SocketType.Stream, ProtocolType.Tcp);
            Console.WriteLine("New connection can be made");

            try {
                IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 11100);
                listener.Bind(localEndPoint);
                listener.Listen(10);

                // Start listening for connections.  

                Console.WriteLine("Waiting for a connection...");
                // Program is suspended while waiting for an incoming connection.  
                handler = listener.Accept();
                Console.WriteLine("New connection is made");

                var tokenSource2 = new CancellationTokenSource();
                CancellationToken ct = tokenSource2.Token;

                //when new connection is made create new thread for sending data through socket
                var task_send = Task.Run(() => {
                    sendingThread(ct);

                }, tokenSource2.Token);

                var tokenSource3 = new CancellationTokenSource();
                CancellationToken ct3 = tokenSource3.Token;

                //when new connection is made create new thread for receiving data through socket
                var task = Task.Run(() => {
                    receivingThread(ct3);
                }, tokenSource3.Token);

                while (true) {
                    if (task.IsCompleted || task_send.IsCompleted) { //activelly oolling the field to find out, wether threads has been cancelled/returned
                        Console.WriteLine("some tasks is Completed");
                        tokenSource2.Cancel();
                        tokenSource3.Cancel();
                        handler.Shutdown(SocketShutdown.Both);
                        handler.Close();
                        break; //breaking from polling loop to prepare for a new connection
                    }
                }

            } catch (SocketException se) {
                Console.WriteLine(se.ToString());
            } catch (Exception e) {
                Console.WriteLine(e.ToString());
            }

            listener.Dispose(); //disposing listener so that new one can be created
        }

        Console.WriteLine("\nPress ENTER to continue...");
        Console.Read();

    }

    public static int Main(String[] args) {
        SynchronousSocketListener synSocList = new SynchronousSocketListener();
        var task_generateMsg = Task.Run(
            (synSocList.threadForGeneratingMessages));

        synSocList.StartListening();

        return 0;
    }
}

client's app structure is basically the same as server's with exception, that it is actively trying to connect to server, if connection has been disrupted:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;

public class SynchronousSocketClient {
    private Socket sender; //socket
    private byte[] bytes; // Data buffer for incoming data.  
    private List < String > messagesToSend;
    private Mutex messagesToSendMutex;

    public SynchronousSocketClient() {
        messagesToSendMutex = new Mutex();

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend = new List < String > ();
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        bytes = new Byte[1024];
        sender = null;
    }

    private void addMessageToQueue(string message) {

        try {
            messagesToSendMutex.WaitOne();
            messagesToSend.Add(message);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private string readMessageFromQueue() {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                ret = messagesToSend[0];
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }

        return ret;
    }

    private void removeMessageFromQueue(string messageToRemove) {

        string ret = null;
        try {
            messagesToSendMutex.WaitOne();
            if (messagesToSend.Count > 0)
                messagesToSend.Remove(messageToRemove);
        } finally {
            messagesToSendMutex.ReleaseMutex();
        }
    }

    private void threadForGeneratingMessages() {
        while (true) {
            addMessageToQueue(Console.ReadLine());
        }
    }

    private void sendingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task sending is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            if (messagesToSend.Count <= 0) {
                Console.WriteLine("No more messages to send");
                Thread.Sleep(5000);
                continue;
            }

            Console.WriteLine("Client is going to write some data for client");

            //send pending messages to client

            string dataToSendNow = messagesToSend[0];
            byte[] msg = Encoding.ASCII.GetBytes(dataToSendNow + "<EOF>");

            try {
                // Send the data through the socket.  

                int bytesSent = sender.Send(msg);
                messagesToSend.Remove(dataToSendNow);

                Console.WriteLine("Client send data");
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());

                Console.WriteLine("returning from sendingThread sockEx");
                return;

            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                Console.WriteLine("returning from sendingThread objDisEx");
                return;
            }

        }
    }

    private void receivingThread(CancellationToken ct) {
        while (true) {
            try {
                if (ct.IsCancellationRequested) {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                    Console.WriteLine("Task receiving is cancelled");
                    return;
                }
            } catch (System.OperationCanceledException ex) {
                Console.WriteLine("System.OperationCanceledException");
                return;
            }

            Console.WriteLine("Client is waiting for a new message to arrive: ");
            try {
                if (!sender.Connected) throw new SocketException();
                //sender.Send(new byte[] { 0 });
                int bytesRec = sender.Receive(bytes);
                string receivedData = Encoding.ASCII.GetString(bytes, 0, bytesRec);

                Console.WriteLine("Client has received message = {0}",
                    receivedData);

                /*
                //do some stuff with the message
                .
                .
                .
                */

            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            } catch (ObjectDisposedException ex) {
                Console.WriteLine(ex.ToString());
                //socketError = true;
                return;
            }
        }
    }

    private void sendingThread() {
        while (true) {
            Console.WriteLine("Write some data for server");

            string line = Console.ReadLine();
            byte[] msg = Encoding.ASCII.GetBytes(line + "<EOF>");

            try {
                // Send the data through the socket.  
                int bytesSent = sender.Send(msg);

                Console.WriteLine("data are send");
                // Receive the response from the remote device. 
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                socketError = false;
                return;

            }

            Thread.Sleep(100);
        }
    }

    volatile private bool socketError = false;

    private void receivingThread() {

        while (true) {
            Console.WriteLine("receiving:");
            try {

                if (!sender.Connected) throw new SocketException();
                sender.Send(new byte[] {
                    0
                });
                int bytesRec = sender.Receive(bytes);
                Console.WriteLine("Echoed test = {0}",
                    Encoding.ASCII.GetString(bytes, 0, bytesRec));
            } catch (SocketException ex) {
                Console.WriteLine(ex.ToString());
                socketError = true;
                return;
            }
        }
    }

    public void StartClient() {
        // Data buffer for incoming data.  

        while (true) {
            // Connect to a remote device.  

            try {
                // Establish the remote endpoint for the socket.  
                // This example uses port 11000 on the local computer.  
                IPHostEntry ipHostInfo = Dns.GetHostEntry(Dns.GetHostName());
                IPAddress ipAddress = ipHostInfo.AddressList[0];
                IPEndPoint remoteEP = new IPEndPoint(ipAddress, 11100);

                // Create a TCP/IP  socket.  
                sender = new Socket(ipAddress.AddressFamily,
                    SocketType.Stream, ProtocolType.Tcp);

                // Connect the socket to the remote endpoint. Catch any errors.  
                try {

                    sender.Connect(remoteEP);

                    Console.WriteLine("Client has made connection to the server");

                    /*
                    try
                    {
                        task_send.Wait();
                    }
                    catch (AggregateException ae)
                    {
                        Console.WriteLine(ae.ToString());
                        foreach (var ex in ae.InnerExceptions)
                        {
                            throw ex;
                        }
                    }
                    */

                    var tokenSource2 = new CancellationTokenSource();
                    CancellationToken ct = tokenSource2.Token;

                    //when new connection is made create new thread for sending data through socket
                    var task_send = Task.Run(() => {
                        sendingThread(ct);

                    }, tokenSource2.Token);

                    var tokenSource3 = new CancellationTokenSource();
                    CancellationToken ct3 = tokenSource3.Token;

                    //when new connection is made create new thread for receiving data through socket
                    var task = Task.Run(() => {
                        receivingThread(ct3);
                    }, tokenSource3.Token);

                    while (true) {
                        if (task.IsCompleted || task_send.IsCompleted) { //activelly oolling the field to find out, wether threads has been cancelled/returned
                            Console.WriteLine("some tasks is Completed");
                            tokenSource2.Cancel();
                            tokenSource3.Cancel();
                            sender.Shutdown(SocketShutdown.Both);
                            sender.Close();
                            break; //breaking from polling loop to prepare for a new connection
                        }
                    }

                    /*
                    var task_send = Task.Run(
                        (sendingThread));

                    var task = Task.Run(
                        (receivingThread));
                    
                    */
                    /*
                    try
                    {
                        task.Wait();
                    }
                    catch (AggregateException ae)
                    {
                        foreach (var ex in ae.InnerExceptions)
                        {
                            throw ex;
                        }
                    }*/

                    /*

                    bool task_receiving_res = false;
                    //int timeout = 60000;
                    Task<bool> task_receiving = new Task<bool>(() => receivingThread());
                    task_receiving.Start();
                    Console.WriteLine("here0");

                    if (await Task.WhenAny(task_receiving, Task.Delay(Timeout.Infinite)) == task_receiving)
                    {
                        Console.WriteLine("here-0");
                        // task completed within timeout
                        task_receiving_res = task_receiving.Result;
                    }
                    else
                    {
                        throw new TimeoutException();
                    }

                    Console.WriteLine("here");

                    bool task_sending_res = false;
                    //int timeout = 60000;
                    Task<bool> task_sending = new Task<bool>(() => sendingThread());
                    task_sending.Start();
                    if (await Task.WhenAny(task_sending, Task.Delay(Timeout.Infinite)) == task_sending)
                    {
                        // task completed within timeout
                        task_sending_res = task_sending.Result;
                    }
                    else
                    {
                        throw new TimeoutException();
                    }
                    */
                    /*
                    Console.WriteLine("here1");

                    while (true) {
                        if (socketError) {
                            /*task.Dispose();
                            task_send.Dispose();
                            
                            */
                    /*
                    socketError = false;
                            throw new Exception("restart connection");
                        }
                        
                    }*/

                    /*
                    while (true)
                    {
                        */
                    /*Console.WriteLine("Socket connected to {0}",
                        sender.RemoteEndPoint.ToString());*/
                    /*
                    // Encode the data string into a byte array.  
                    byte[] msg = Encoding.ASCII.GetBytes("This is a test<EOF>");

                    // Send the data through the socket.  
                    int bytesSent = sender.Send(msg);
                    */
                    // Receive the response from the remote device.  
                    /*int bytesRec = sender.Receive(bytes);
                    Console.WriteLine("Echoed test = {0}",
                        Encoding.ASCII.GetString(bytes, 0, bytesRec));
                    */
                    /*
                    bytesRec = sender.Receive(bytes);
                    Console.WriteLine("Echoed test = {0}",
                        Encoding.ASCII.GetString(bytes, 0, bytesRec));
                    */
                    /*

                    Thread.Sleep(5000);


                }*/

                    // Release the socket.  
                    sender.Shutdown(SocketShutdown.Both);
                    sender.Close();

                } catch (ArgumentNullException ane) {
                    Console.WriteLine("ArgumentNullException : {0}", ane.ToString());
                } catch (SocketException se) {
                    Console.WriteLine("SocketException : {0}", se.ToString());
                } catch (Exception e) {
                    Console.WriteLine("Unexpected exception : {0}", e.ToString());
                }

            } catch (Exception e) {
                Console.WriteLine(e.ToString());
            }

            Thread.Sleep(5000);
        }

    }

    public static int Main(String[] args) {
        SynchronousSocketClient sc = new SynchronousSocketClient();

        var task_generateMsg = Task.Run(
            (sc.threadForGeneratingMessages));

        sc.StartClient();
        return 0;
    }
}

How could these code be written do be more efficient? Is this safe, or are there any problems with these code (besides efficiency)?


Edit:
I have a problem with the code:
sometimes receivingThread is looping without halting at handler/sender.Receive() function, it seems to read 0 bytes even though other side does not send anything. How can this be corrected?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文