IO 完成端口:WSARecv() 如何工作?

发布于 2024-08-12 02:46:31 字数 2801 浏览 5 评论 0原文

我想使用工作线程池和 IO 完成端口编写服务器。服务器应该在多个客户端之间处理和转发消息。 “每个客户端”数据位于 ClientContext 类中。此类实例之间的数据使用工作线程进行交换。我认为这是一个典型的场景。

但是,这些 IO 完成端口有两个问题。

(1) 第一个问题是服务器基本上从客户端接收数据,但我永远不知道是否收到完整的消息。事实上,WSAGetLastError() 总是返回 WSARecv() 仍处于挂起状态。我尝试使用 WaitForMultipleObjects() 等待事件 OVERLAPPED.hEvent。然而,它永远阻塞,即 WSARecv() 在我的程序中永远不会完成。 我的目标是在进一步处理开始之前绝对确保已收到整个消息。我的消息的标头中有一个“消息长度”字段,但我真的不知道如何将它与 IOCP 函数参数一起使用。

(2) 如果下面代码片段中的WSARecv()被注释掉,程序仍然接收数据。这意味着什么?这是否意味着我根本不需要调用 WSARecv() ?我无法通过这些 IO 完成端口获得确定性行为。 感谢您的帮助!

while(WaitForSingleObject(module_com->m_shutdown_event, 0)!= WAIT_OBJECT_0)
{

    dequeue_result = GetQueuedCompletionStatus(module_com->m_h_io_completion_port,
                                               &transfered_bytes,
                                               (LPDWORD)&lp_completion_key,
                                               &p_ol,
                                               INFINITE);
     if (lp_completion_key == NULL)
     {
         //Shutting down
         break;
     }

     //Get client context
     current_context = (ClientContext *)lp_completion_key;

     //IOCP error
     if(dequeue_result == FALSE)
     {
         //... do some error handling...
     }
     else
     {   
         // 'per client' data
         thread_state = current_context->GetState();
         wsa_recv_buf = current_context->GetWSABUFPtr();

         // 'per call' data
         this_overlapped = current_context->GetOVERLAPPEDPtr();
     }

     while(thread_state != STATE_DONE)
     {
         switch(thread_state)
         {
         case STATE_INIT:

             //Check if completion packet has been posted by internal function or by WSARecv(), WSASend()
             if(transfered_bytes > 0)
             {
                 dwFlags = 0;
                 transf_now = 0;
                 transf_result = WSARecv(current_context->GetSocket(),
                                         wsa_recv_buf,
                                         1,
                                         &transf_now,
                                         &dwFlags,
                                         this_overlapped,
                                         NULL);

                 if (SOCKET_ERROR == transf_result && WSAGetLastError() != WSA_IO_PENDING)
                 {   
                     //...error handling...
                     break;
                 }

                 // put received message into a message queue

             }
             else // (transfered_bytes == 0)
             {
                 // Another context passed data to this context
                 // and notified it via PostQueuedCompletionStatus().
             }
             break;
         }
     }
 }

I want to write a server using a pool of worker threads and an IO completion port. The server should processes and forwards messages between multiple clients. The 'per client' data is in a class ClientContext. Data between instances of this class are exchanged using the worker threads. I think this is a typical scenario.

However, I have two problems with those IO completion ports.

(1) The first problem is that the server basically receives data from clients but I never know if a complete message was received. In fact WSAGetLastError() always returns that WSARecv() is still pending. I tried to wait for the event OVERLAPPED.hEvent with WaitForMultipleObjects(). However, it blocks forever, i.e WSARecv() never completes in my program.
My goal is to be absolutely sure that the whole message has been received before further processing starts. My message has a 'message length' field in its header, but I don't really see how to use it with the IOCP function parameters.

(2) If WSARecv() is commented out in the code snippet below, the program still receives data. What does that mean? Does it mean that I don't need to call WSARecv() at all? I am not able to get a deterministic behaviour with those IO completion ports.
Thanks for your help!

while(WaitForSingleObject(module_com->m_shutdown_event, 0)!= WAIT_OBJECT_0)
{

    dequeue_result = GetQueuedCompletionStatus(module_com->m_h_io_completion_port,
                                               &transfered_bytes,
                                               (LPDWORD)&lp_completion_key,
                                               &p_ol,
                                               INFINITE);
     if (lp_completion_key == NULL)
     {
         //Shutting down
         break;
     }

     //Get client context
     current_context = (ClientContext *)lp_completion_key;

     //IOCP error
     if(dequeue_result == FALSE)
     {
         //... do some error handling...
     }
     else
     {   
         // 'per client' data
         thread_state = current_context->GetState();
         wsa_recv_buf = current_context->GetWSABUFPtr();

         // 'per call' data
         this_overlapped = current_context->GetOVERLAPPEDPtr();
     }

     while(thread_state != STATE_DONE)
     {
         switch(thread_state)
         {
         case STATE_INIT:

             //Check if completion packet has been posted by internal function or by WSARecv(), WSASend()
             if(transfered_bytes > 0)
             {
                 dwFlags = 0;
                 transf_now = 0;
                 transf_result = WSARecv(current_context->GetSocket(),
                                         wsa_recv_buf,
                                         1,
                                         &transf_now,
                                         &dwFlags,
                                         this_overlapped,
                                         NULL);

                 if (SOCKET_ERROR == transf_result && WSAGetLastError() != WSA_IO_PENDING)
                 {   
                     //...error handling...
                     break;
                 }

                 // put received message into a message queue

             }
             else // (transfered_bytes == 0)
             {
                 // Another context passed data to this context
                 // and notified it via PostQueuedCompletionStatus().
             }
             break;
         }
     }
 }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浅浅 2024-08-19 02:46:31

(1) 第一个问题是
服务器基本上接收数据
客户,但我不知道是否完整
已收到消息。

您的 recv 调用可以返回从 1 个字节到整个“消息”的任何位置。您需要包含逻辑,该逻辑在有足够的数据时计算出完整“消息”的长度,然后在您实际拥有完整“消息”时计算出。虽然您没有足够的数据,但您可以使用相同的内存缓冲区重新发出 recv 调用,但使用更新的 WSABUF 结构,该结构指向您已接收的数据的末尾。通过这种方式,您可以在缓冲区中积累完整的消息,而无需在每次 recv 调用完成后复制数据。

(2) 如果 WSARecv() 被注释掉
下面的代码片段,程序
仍然接收数据。那是什么
意思是?这是否意味着我不需要
到底要调用 WSARecv() 吗?

我希望这只是意味着您的代码中存在错误...

请注意,从可扩展性的角度来看,不要在重叠结构中使用事件,而是将套接字与 IOCP 相关联并允许完成是“更好”的被发布到处理您的完成的线程池。

我有一个免费的 IOCP 客户端/服务器框架,可以从 此处 获取可能会给你一些提示;以及有关 CodeProject 的一系列文章(第一篇位于:http://www.codeproject. com/KB/IP/jbsocketserver1.aspx),我在其中处理整个“读取完整消息”问题(请参阅“分块字节流”)。

(1) The first problem is that the
server basically receives data from
clients but I never know if a complete
message was received.

Your recv calls can return anywhere from 1 byte to the whole 'message'. You need to include logic that works out when it has enough data to work out the length of the complete 'message' and then work out when you actually have a complete 'message'. Whilst you do NOT have enough data you can reissue a recv call using the same memory buffer but with an updated WSABUF structure that points to the end of the data that you have already recvd. In that way you can accumulate a full message in your buffer without needing to copy data after every recv call completes.

(2) If WSARecv() is commented out in
the code snippet below, the program
still receives data. What does that
mean? Does it mean that I don't need
to call WSARecv() at all?

I expect it just means you have a bug in your code...

Note that it's 'better' from a scalability point of view not to use the event in the overlapped structure and instead to associate the socket with the IOCP and allow the completions to be posted to a thread pool that deals with your completions.

I have a free IOCP client/server framework available from here which may give you some hints; and a series of articles on CodeProject (first one is here: http://www.codeproject.com/KB/IP/jbsocketserver1.aspx) where I deal with the whole 'reading complete messages' problem (see "Chunking the byte stream").

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文