在匿名管(MSDN)上使用WaitgormultipleObjects

发布于 2025-02-08 07:32:06 字数 2007 浏览 2 评论 0原文

匿名管道上使用 waitformultipleObjects ,我有一个问题。我的目标是在同时等待的管道和另一个对象。更确切地说,我将管道用于本地IPC。我在 stackoverflow上看到了一种方法,您可以在其中使用 createEvent 创建句柄,并在管道上进行 WriteFile 操作并重置时设置此事件它在 readfile 之后,但这是一个简单的单元测试,可以证明这种方法不正确。

#include <windows.h>
#include<iostream>

#pragma comment(lib,"ws2_32.lib") //Winsock Library

HANDLE event_on_pipe;
HANDLE endpoint_pipe[2];

DWORD WINAPI WThread_Pipe( LPVOID lpParam )
{

        ULONG buffer = 1234;
        DWORD bytes;
        int write;
        int count = 10;
        while(count --)
        {
            WriteFile(endpoint_pipe[1],(char*)&buffer, sizeof(buffer),&bytes,NULL);
            if ( !SetEvent(event_on_pipe) )
            {
                printf("SetEvent failed (%d)\n", GetLastError());
                return 0;
            }
        }
        return 0;
}



int main()
{
    HANDLE Thread_Pipe;
    DWORD ThreadID_Pipe;
    event_on_pipe = CreateEvent(NULL,FALSE,FALSE,NULL);


    CreatePipe(&endpoint_pipe[0],&endpoint_pipe[1],NULL,0);

    
    HANDLE lphandles[1];

    lphandles[0] = event_on_pipe;

    Thread_Pipe = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE) WThread_Pipe, NULL, 0, &ThreadID_Pipe);


    ULONG buffer;
    DWORD bytes;
    while(1)
    {
        DWORD obj = WaitForMultipleObjects(1,lphandles,FALSE,INFINITE);

        if(obj == WAIT_OBJECT_0) 
        {
            ReadFile(endpoint_pipe[0], &buffer,sizeof(ULONG),&bytes,NULL);
            ResetEvent(event_on_pipe);
            Sleep(1000);
        }


        std::cout << buffer<<std::endl;
    }




    return 0;
}

我应该在控制台1234上看到10次,而我看到它两次。 主要问题是,当我将函数打电话给“读取文件”操作后一秒钟时,线程“编写管道并设置事件” 9次,所以我仍然剩下9个读取操作,但是由于将事件设置为9次,因此将设置,然后将发出信号。

我想要一种有效的方法来信号waitformultipleObjects当写入管道时(用于本地过程交流)。它不需要是匿名管道,也可能是其他类似管道在线程之间交换消息的东西。因此,如果有人知道如何完成,请提供帮助。

I have an issue in using WaitForMultipleObjects on Anonymous Pipe. My goal is to Wait on a pipe to be written and another object at the same time. To be more precise, I am using pipes for local IPC. I saw on stackoverflow an approach where you can create a handle using CreateEvent and set this event whenever you have a WriteFile operation on the pipe and reset it after a ReadFile but here is a simple unit test that proofs that this approach is not correct.

#include <windows.h>
#include<iostream>

#pragma comment(lib,"ws2_32.lib") //Winsock Library

HANDLE event_on_pipe;
HANDLE endpoint_pipe[2];

DWORD WINAPI WThread_Pipe( LPVOID lpParam )
{

        ULONG buffer = 1234;
        DWORD bytes;
        int write;
        int count = 10;
        while(count --)
        {
            WriteFile(endpoint_pipe[1],(char*)&buffer, sizeof(buffer),&bytes,NULL);
            if ( !SetEvent(event_on_pipe) )
            {
                printf("SetEvent failed (%d)\n", GetLastError());
                return 0;
            }
        }
        return 0;
}



int main()
{
    HANDLE Thread_Pipe;
    DWORD ThreadID_Pipe;
    event_on_pipe = CreateEvent(NULL,FALSE,FALSE,NULL);


    CreatePipe(&endpoint_pipe[0],&endpoint_pipe[1],NULL,0);

    
    HANDLE lphandles[1];

    lphandles[0] = event_on_pipe;

    Thread_Pipe = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE) WThread_Pipe, NULL, 0, &ThreadID_Pipe);


    ULONG buffer;
    DWORD bytes;
    while(1)
    {
        DWORD obj = WaitForMultipleObjects(1,lphandles,FALSE,INFINITE);

        if(obj == WAIT_OBJECT_0) 
        {
            ReadFile(endpoint_pipe[0], &buffer,sizeof(ULONG),&bytes,NULL);
            ResetEvent(event_on_pipe);
            Sleep(1000);
        }


        std::cout << buffer<<std::endl;
    }




    return 0;
}

I am supposed to see on the console 1234 printed 10 times while I am seeing it two times.
The main problem is that when I call the function Sleep for one second after ReadFile operation the thread "write the pipe and set the event" 9 times, So I still have 9 ReadFile operation left but since setting the event 9 times is considered as one set then WaitForMultipleObjects is going to be signaled once.

I want an efficient way to signal WaitForMultipleObjects when a pipe is written (for local inter process communication). It doesn't need to be an anonymous pipe, it could be something else that acts like pipes to exchange messages between threads. So if someone knows how it could be done please help.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

眼中杀气 2025-02-15 07:32:06

正如您发现的那样,事件只有两个状态:发出信号且未发出信号。它没有能力跟踪发出信号的次数。

我会使用信号量,而不是事件,而是这样的东西:

#include <windows.h>
#include <iostream>

HANDLE event_on_pipe;
HANDLE endpoint_pipe[2];
ULONG end = -1;

DWORD WINAPI WThread_Pipe(LPVOID lpParam) {

    ULONG buffer = 1234;

    DWORD bytes;
    for (int count = 0; count < 10; count++) {
        WriteFile(endpoint_pipe[1], (char*)&buffer, sizeof(buffer), &bytes, NULL);
        ReleaseSemaphore(event_on_pipe, 1, NULL);
    }
    // write a unique value to the pipe to signal the end of data:
    WriteFile(endpoint_pipe[1], (char*)&end, sizeof(end), &bytes, NULL);
    ReleaseSemaphore(event_on_pipe, 1, NULL);
    return 0;
}

int main() {
    HANDLE Thread_Pipe;
    DWORD ThreadID_Pipe;
    event_on_pipe = CreateSemaphore(NULL, 0, 100, NULL);

    CreatePipe(&endpoint_pipe[0], &endpoint_pipe[1], NULL, 0);

    HANDLE lphandles[1];

    lphandles[0] = event_on_pipe;

    Thread_Pipe = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WThread_Pipe, NULL, 0, &ThreadID_Pipe);

    ULONG buffer;
    DWORD bytes;
    while (WAIT_OBJECT_0 == WaitForMultipleObjects(1, lphandles, FALSE, INFINITE)) {
        ReadFile(endpoint_pipe[0], &buffer, sizeof(buffer), &bytes, NULL);
        if (buffer == end) {
            break;
        }
        std::cout << buffer << std::endl;
        Sleep(1000);
    }

    return 0;
}

我试图使大多数代码与您的使用方式非常相似,将更改(或多或少)保持在(或多或少)的最低限度以使其使它工作。

如果我是从头开始的,并且只想在单个过程中的线程之间在线程之间传输一系列数字(或其他相同类型),那么我可能会改用线程安全队列。

如果我们想使用Windows原始图,则队列可能看起来像这样:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue {
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) {
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) {
        WaitForSingleObject(space_avail, INFINITE);
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    bool pop(T &dest, unsigned maxWaitMs = 100) {
        if (WAIT_OBJECT_0 != WaitForSingleObject(data_avail, maxWaitMs)) {
            return false;
        }
        EnterCriticalSection(&mutex);
        dest = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return true;
    }

    ~queue() {
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

...使用它的代码看起来像这样:

#include "queue.hpp"
#include <iostream>
#include <thread>
#include <chrono>

void sender(queue<int> &dest) {
    for (int i = 0; i < 10; i++)
        dest.push(i);
}

int main() {
    using namespace std::literals;

    queue<int> q;

    auto t = std::thread([&] { sender(q); });

    int val;

    while (q.pop(val)) {
        std::cout << val << std::endl;
        std::this_thread::sleep_for(1s);
    }

    t.join();
}

您还可以使用本机C ++原始词编写线程安全队列。 For one example, Anthony Williams outlined a thread-safe queue using a condition variable some time ago:

-condition- variables.html 稍微更改接口,但只会稍微更改(并且您可以将其修改为具有我的接口,如我在上面显示的,如果您喜欢的话)。

As you've found, an Event has only two states: signaled and not signaled. It has no capability to keep track of how many times it's been signaled.

Instead of an event, I'd use a semaphore, something like this:

#include <windows.h>
#include <iostream>

HANDLE event_on_pipe;
HANDLE endpoint_pipe[2];
ULONG end = -1;

DWORD WINAPI WThread_Pipe(LPVOID lpParam) {

    ULONG buffer = 1234;

    DWORD bytes;
    for (int count = 0; count < 10; count++) {
        WriteFile(endpoint_pipe[1], (char*)&buffer, sizeof(buffer), &bytes, NULL);
        ReleaseSemaphore(event_on_pipe, 1, NULL);
    }
    // write a unique value to the pipe to signal the end of data:
    WriteFile(endpoint_pipe[1], (char*)&end, sizeof(end), &bytes, NULL);
    ReleaseSemaphore(event_on_pipe, 1, NULL);
    return 0;
}

int main() {
    HANDLE Thread_Pipe;
    DWORD ThreadID_Pipe;
    event_on_pipe = CreateSemaphore(NULL, 0, 100, NULL);

    CreatePipe(&endpoint_pipe[0], &endpoint_pipe[1], NULL, 0);

    HANDLE lphandles[1];

    lphandles[0] = event_on_pipe;

    Thread_Pipe = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WThread_Pipe, NULL, 0, &ThreadID_Pipe);

    ULONG buffer;
    DWORD bytes;
    while (WAIT_OBJECT_0 == WaitForMultipleObjects(1, lphandles, FALSE, INFINITE)) {
        ReadFile(endpoint_pipe[0], &buffer, sizeof(buffer), &bytes, NULL);
        if (buffer == end) {
            break;
        }
        std::cout << buffer << std::endl;
        Sleep(1000);
    }

    return 0;
}

I've tried to keep most of the code pretty similar to how you had it, keeping changes to (more or less) a minimum necessary to get it to work.

If I were starting from scratch, and just wanted to transmit a series of numbers (or other things, all of the same type) between threads in a single process, I'd probably use a thread-safe queue instead.

If we wanted to use Windows primitives, the queue might look something like this:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue {
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) {
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) {
        WaitForSingleObject(space_avail, INFINITE);
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    bool pop(T &dest, unsigned maxWaitMs = 100) {
        if (WAIT_OBJECT_0 != WaitForSingleObject(data_avail, maxWaitMs)) {
            return false;
        }
        EnterCriticalSection(&mutex);
        dest = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return true;
    }

    ~queue() {
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

...and the code using it would look something like this:

#include "queue.hpp"
#include <iostream>
#include <thread>
#include <chrono>

void sender(queue<int> &dest) {
    for (int i = 0; i < 10; i++)
        dest.push(i);
}

int main() {
    using namespace std::literals;

    queue<int> q;

    auto t = std::thread([&] { sender(q); });

    int val;

    while (q.pop(val)) {
        std::cout << val << std::endl;
        std::this_thread::sleep_for(1s);
    }

    t.join();
}

You could also write the thread-safe queue using native C++ primitives instead. For one example, Anthony Williams outlined a thread-safe queue using a condition variable some time ago:

https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

That changes the interface slightly, but only slightly (and you could modify it to have an interface about like I've shown above, if you preferred).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文