使用命名管道 WinAPI 进行异步 I/O

发布于 2024-09-08 21:02:30 字数 3156 浏览 5 评论 0原文

好的,我问了一些关于尝试完成我想做的事情的不同方面的问题。这次我在从命名管道读取数据时遇到了大问题。我想如果我能正确设置的话,我已经收集了足够的信息来完成我正在从事的项目。我将在下面包含所有相关代码,但我的任务是:从我没有编写的程序中读取输出(连续)并将其发布到 WinAPI。所以我的问题是,我刚刚从匿名管道切换到命名管道,并且在尝试正确设置它们以便检索信息时遇到问题。我有一个基于 MSDN 示例的框架设置。

#define WAIT_TIME 2 // 2s
#define INSTANCES 4 // Number of threads
#define CONNECT_STATE 0
#define READ_STATE 1
#define WRITE_STATE 2
#define WORLDRD 0
#define WORLDWR 1
#define WORLDINRD 2
#define WORLDINWR 3
#define BUFSIZE 0x1000 // Buffer size 4096 (in bytes)
#define PIPE_TIMEOUT 0x1388 // Timeout 5000 (in ms)

void Arc_Redirect::createProcesses()
{
TCHAR programName[]=TEXT("EXEC_PROGRAM.exe");
PROCESS_INFORMATION pi; 
STARTUPINFO si;
BOOL bSuccess = FALSE; 

ZeroMemory(hEvents,(sizeof(hEvents)*INSTANCES));
ZeroMemory(outStd,(sizeof(PIPE_HANDLES)*INSTANCES));

// Prep pipes
for(int i=0;i<INSTANCES;i++)
{
    hEvents[i] = ::CreateEvent(NULL, TRUE, FALSE, NULL);

    if(hEvents[i] == NULL)
        throw "Could not init program!";

    outStd[i].o1.hEvent = hEvents[i];

    outStd[i].hPipeInst = ::CreateNamedPipe(
        TEXT("\\\\.\\pipe\\arcworld"), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
        PIPE_UNLIMITED_INSTANCES, BUFSIZE*sizeof(TCHAR), BUFSIZE*sizeof(TCHAR), PIPE_TIMEOUT, NULL);

    if(outStd[i].hPipeInst == INVALID_HANDLE_VALUE)
        throw "Could not init program!";

    outStd[i].pendingIO = getState(outStd[i].hPipeInst,&outStd[i].o1);

    outStd[i].dwState = outStd[i].pendingIO ?
        CONNECT_STATE : READ_STATE;
}

// Set stuff up
ZeroMemory( &pi, sizeof(PROCESS_INFORMATION));
ZeroMemory( &si, sizeof(STARTUPINFO));
si.cb = sizeof(STARTUPINFO); 
si.hStdError = outStd[WORLDRD].hPipeInst;
si.hStdOutput = outStd[WORLDRD].hPipeInst;
si.hStdInput = outStd[WORLDINWR].hPipeInst;
si.dwFlags |= STARTF_USESHOWWINDOW|STARTF_USESTDHANDLES|FILE_FLAG_OVERLAPPED;

    // Start our process with the si info
CreateProcess(programName,NULL,NULL,NULL,FALSE,0,NULL,NULL,&si,&pi);
}

BOOL Arc_Redirect::getState(HANDLE hPipe, LPOVERLAPPED lpo)
{
    BOOL connected, pendingIO = FALSE;

    // Overlap connection for this pipe
    connected = ::ConnectNamedPipe(hPipe,lpo);

    if(connected)
        throw "ConnectNamedPipe(); failed!";

    switch(GetLastError())
    {
        case ERROR_IO_PENDING:
            pendingIO = TRUE;
        break;
        case ERROR_PIPE_CONNECTED:
            if(SetEvent(lpo->hEvent))
                break;  
        default:
            throw "ConnectNamedPipe(); failed!";
        break;
    }

    return pendingIO;
}

outStd[INSTANCES] 被定义为 PIPE_HANDLES (自定义结构),它位于下面,

typedef struct
{
    HANDLE hPipeInst;
    OVERLAPPED o1;
    TCHAR chReq[BUFSIZE];
    TCHAR chReply[BUFSIZE];
    DWORD dwRead;
    DWORD dwWritten;
    DWORD dwState;
    DWORD cbRet;
    BOOL pendingIO;
} PIPE_HANDLES, *LPSTDPIPE;

现在从这里开始我有点迷失了。我不知道该去哪里。我尝试使用 MSDN 示例中的循环,但它不能正常工作,无法满足我的要求。我需要获取管道的读取端并检索信息(再次连续),同时打开写入端,以便每当我需要写入它时。有人有什么想法吗?我一直在尝试像使用匿名管道一样执行 ReadFile() ,但它似乎没有以相同的方式工作。

另外,请注意:代码有点草率,因为我一直在使用它,所以我很抱歉。当我让它正常工作后,我肯定会清理它。

Ok, I have asked a few questions about different facets of trying to accomplish what I want to do. This time I'm having big issues just reading from a named pipe. I think I have harvested enough information to possibly complete the project I am working on if I can set this up properly. I will include all relevant code below, but my mission is this: read the output (continuously) from a program I did not write and post it to the WinAPI. So my problem is that I have just switched from anonymous pipes to named pipes and I am having issues trying to properly set them up so I can retrieve information. I have a framework setup based off of an example from the MSDN.

#define WAIT_TIME 2 // 2s
#define INSTANCES 4 // Number of threads
#define CONNECT_STATE 0
#define READ_STATE 1
#define WRITE_STATE 2
#define WORLDRD 0
#define WORLDWR 1
#define WORLDINRD 2
#define WORLDINWR 3
#define BUFSIZE 0x1000 // Buffer size 4096 (in bytes)
#define PIPE_TIMEOUT 0x1388 // Timeout 5000 (in ms)

void Arc_Redirect::createProcesses()
{
TCHAR programName[]=TEXT("EXEC_PROGRAM.exe");
PROCESS_INFORMATION pi; 
STARTUPINFO si;
BOOL bSuccess = FALSE; 

ZeroMemory(hEvents,(sizeof(hEvents)*INSTANCES));
ZeroMemory(outStd,(sizeof(PIPE_HANDLES)*INSTANCES));

// Prep pipes
for(int i=0;i<INSTANCES;i++)
{
    hEvents[i] = ::CreateEvent(NULL, TRUE, FALSE, NULL);

    if(hEvents[i] == NULL)
        throw "Could not init program!";

    outStd[i].o1.hEvent = hEvents[i];

    outStd[i].hPipeInst = ::CreateNamedPipe(
        TEXT("\\\\.\\pipe\\arcworld"), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
        PIPE_UNLIMITED_INSTANCES, BUFSIZE*sizeof(TCHAR), BUFSIZE*sizeof(TCHAR), PIPE_TIMEOUT, NULL);

    if(outStd[i].hPipeInst == INVALID_HANDLE_VALUE)
        throw "Could not init program!";

    outStd[i].pendingIO = getState(outStd[i].hPipeInst,&outStd[i].o1);

    outStd[i].dwState = outStd[i].pendingIO ?
        CONNECT_STATE : READ_STATE;
}

// Set stuff up
ZeroMemory( &pi, sizeof(PROCESS_INFORMATION));
ZeroMemory( &si, sizeof(STARTUPINFO));
si.cb = sizeof(STARTUPINFO); 
si.hStdError = outStd[WORLDRD].hPipeInst;
si.hStdOutput = outStd[WORLDRD].hPipeInst;
si.hStdInput = outStd[WORLDINWR].hPipeInst;
si.dwFlags |= STARTF_USESHOWWINDOW|STARTF_USESTDHANDLES|FILE_FLAG_OVERLAPPED;

    // Start our process with the si info
CreateProcess(programName,NULL,NULL,NULL,FALSE,0,NULL,NULL,&si,&pi);
}

BOOL Arc_Redirect::getState(HANDLE hPipe, LPOVERLAPPED lpo)
{
    BOOL connected, pendingIO = FALSE;

    // Overlap connection for this pipe
    connected = ::ConnectNamedPipe(hPipe,lpo);

    if(connected)
        throw "ConnectNamedPipe(); failed!";

    switch(GetLastError())
    {
        case ERROR_IO_PENDING:
            pendingIO = TRUE;
        break;
        case ERROR_PIPE_CONNECTED:
            if(SetEvent(lpo->hEvent))
                break;  
        default:
            throw "ConnectNamedPipe(); failed!";
        break;
    }

    return pendingIO;
}

outStd[INSTANCES] is defined as PIPE_HANDLES (a custom struct) which is below

typedef struct
{
    HANDLE hPipeInst;
    OVERLAPPED o1;
    TCHAR chReq[BUFSIZE];
    TCHAR chReply[BUFSIZE];
    DWORD dwRead;
    DWORD dwWritten;
    DWORD dwState;
    DWORD cbRet;
    BOOL pendingIO;
} PIPE_HANDLES, *LPSTDPIPE;

Now from here is where I am getting a bit lost. I'm not sure where to go. I tried using the loop in the MSDN example, but it didn't work properly for what I am looking to do. I need to take the read end of the pipe and retrieve the information (again, continuously) while having a write end opened as well for whenever I may need to write to it. Does anyone have any ideas? I have been trying to do a ReadFile() as I would do with an anonymous pipe, but it does not appear to be working in the same fashion.

Also, please note: The code is a bit sloppy because I have been working with it, so I apologize. I will definitely be cleaning it up after I get this to function properly.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

浅听莫相离 2024-09-15 21:02:30

您应该有两种重叠的结构,一种用于读取,一种用于写入。此外,当您想要关闭管道时,每个管道还需要一个事件句柄,当您想要中止所有管道(并关闭应用程序)时,还需要一个事件句柄。您可以为所有管道当前涉及的每个操作使用一个 WaitForMultipleObjects,或者在两个线程中将读取和写入分开,每个线程中都有一个 WFMO。我只会使用一个线程,因为关闭管道会更简单(否则您需要在管道句柄上进行一些引用计数,并仅在引用计数降至零时才关闭它)。

当您收到一个事件时,然后处理它,并在您刚刚处理的事件之后的数组中的所有句柄上尝试使用 0 秒的 WFMO。这样管道就不会缺水。当 WFMO 经过 0 秒后,从头开始重复正常的 WFMO。

如果需要高并发性,则在单独的线程中处理事件,并忽略 WFMO 中当前处理的句柄。然而,跟踪所有句柄就会变得有点复杂。

You should have two OVERLAPPED structures, one for reading and one for writing. Also you need one event handle per pipe for when you want to close the pipe, and one more event when you want to abort all (and close application). You can have one WaitForMultipleObjects for every operation that all pipes are currently involved with, or separate reading from writing in two threads with one WFMO in each. I would go with only one thread, because closing the pipe is then simpler (otherwise you need to have some reference counting on the pipe handle and close it only when reference count drops to zero).

When you get one event then process it, and try WFMO with 0 seconds on all handles that were in the array after the one you just processed. This way no pipe will get starved. When 0 second WFMO elapses repeat normal WFMO from beginning.

If you need high concurrency then process events in separate threads, and omit the currently processing handles from WFMO. However, tracking all the handles then gets a little complicated.

幸福还没到 2024-09-15 21:02:30

您是否尝试过在 CreateNamedPipe 调用中传递 PIPE_NOWAIT 而不是 PIPE_WAIT ?这将允许 ReadFile 和 WriteFile 成为非阻塞的。

或者,您是否尝试过使用异步IO?您正在传递 FILE_FLAG_OVERLAPPED 标志,因此这应该可行。如果您尝试过,您遇到了什么问题?

Have you tried passing PIPE_NOWAIT instead of PIPE_WAIT in the CreateNamedPipe call? This will allow ReadFile and WriteFile to be non-blocking.

Alternatively, have you tried using async IO? You're passing the FILE_FLAG_OVERLAPPED flag, so this should work. If you've tried it, what problems did you encounter?

花开雨落又逢春i 2024-09-15 21:02:30

在 Linux 世界中,一个程序可以通过 write/fwrite 调用写入命名管道,另一个程序可以通过 read/fread() 读取它。

读/写操作中必须使用命名管道的完整路径。

In the linux world, one program can write to a named pipe through write/fwrite calls and another program can read it through read/fread().

The FULL path of the named pipe must be used in read/write operations.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文