尝试使用 Win32 线程进行异步 I/O
我正在为Windows编写一个串口软件。为了提高性能,我尝试将例程转换为使用异步 I/O。我已经编写了代码并且运行得相当好,但我在这方面是半初学者,我想进一步提高程序的性能。在程序的压力测试期间(即以高波特率尽可能快地向端口突发数据/从端口突发数据),CPU 负载变得相当高。
如果有人有 Windows 中异步 I/O 和多线程的经验,如果您能看一下我的程序,我将不胜感激。我主要担心两个问题:
异步 I/O 是否正确实现?我在网上找到了一些相当可靠的来源,建议您可以通过在最后用您自己的数据实现您自己的 OVERLAPPED 结构来将用户数据传递给回调函数。这似乎工作得很好,但对我来说确实有点“hackish”。另外,当我从同步/轮询转换为异步/回调时,程序的性能并没有太大提高,这让我怀疑我做错了什么。
使用 STL std::deque 作为 FIFO 数据缓冲区是否合理?由于当前编写的程序,我只允许一次接收 1 个字节的数据,然后才必须对其进行处理。因为我不知道我会收到多少数据,它可能是无穷无尽的。我认为,当必须分配数据时,一次 1 字节会在双端队列后面产生缓慢的行为。而且我也不相信双端队列是线程安全的(我应该吗?)。 如果使用 STL 双端队列不太明智,是否有更好的数据类型可供使用的建议?基于静态数组的环形缓冲区?
也非常欢迎有关代码的任何其他反馈。
串行例程的实现使我有一个名为“Comport”的父类,它处理与串行 I/O 相关的所有内容。我从这个类继承了另一个名为“ThreadedComport”的类,它是一个多线程版本。
ThreadedComport 类(其相关部分)
class ThreadedComport : public Comport
{
private:
HANDLE _hthread_port; /* thread handle */
HANDLE _hmutex_port; /* COM port access */
HANDLE _hmutex_send; /* send buffer access */
HANDLE _hmutex_rec; /* rec buffer access */
deque<uint8> _send_buf;
deque<uint8> _rec_buf;
uint16 _data_sent;
uint16 _data_received;
HANDLE _hevent_kill_thread;
HANDLE _hevent_open;
HANDLE _hevent_close;
HANDLE _hevent_write_done;
HANDLE _hevent_read_done;
HANDLE _hevent_ext_send; /* notifies external thread */
HANDLE _hevent_ext_receive; /* notifies external thread */
typedef struct
{
OVERLAPPED overlapped;
ThreadedComport* caller; /* add user data to struct */
} OVERLAPPED_overlap;
OVERLAPPED_overlap _send_overlapped;
OVERLAPPED_overlap _rec_overlapped;
uint8* _write_data;
uint8 _read_data;
DWORD _bytes_read;
static DWORD WINAPI _tranceiver_thread (LPVOID param);
void _send_data (void);
void _receive_data (void);
DWORD _wait_for_io (void);
static void WINAPI _send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);
static void WINAPI _receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);
};
通过 CreateThread() 创建的主线程例程:
DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
ThreadedComport* caller = (ThreadedComport*) param;
HANDLE handle_array [3] =
{
caller->_hevent_kill_thread, /* WAIT_OBJECT_0 */
caller->_hevent_open, /* WAIT_OBJECT_1 */
caller->_hevent_close /* WAIT_OBJECT_2 */
};
DWORD result;
do
{
/* wait for anything to happen */
result = WaitForMultipleObjects(3,
handle_array,
false, /* dont wait for all */
INFINITE);
if(result == WAIT_OBJECT_1 ) /* open? */
{
do /* while port is open, work */
{
caller->_send_data();
caller->_receive_data();
result = caller->_wait_for_io(); /* will wait for the same 3 as in handle_array above,
plus all read/write specific events */
} while (result != WAIT_OBJECT_0 && /* while not kill thread */
result != WAIT_OBJECT_2); /* while not close port */
}
else if(result == WAIT_OBJECT_2) /* close? */
{
; /* do nothing */
}
} while (result != WAIT_OBJECT_0); /* kill thread? */
return 0;
}
它依次调用以下三个函数:
void ThreadedComport::_send_data (void)
{
uint32 send_buf_size;
if(_send_buf.size() != 0) // anything to send?
{
WaitForSingleObject(_hmutex_port, INFINITE);
if(_is_open) // double-check port
{
bool result;
WaitForSingleObject(_hmutex_send, INFINITE);
_data_sent = 0;
send_buf_size = _send_buf.size();
if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
{
send_buf_size = _MAX_MESSAGE_LENGTH;
}
_write_data = new uint8 [send_buf_size];
for(uint32 i=0; i<send_buf_size; i++)
{
_write_data[i] = _send_buf.front();
_send_buf.pop_front();
}
_send_buf.clear();
ReleaseMutex(_hmutex_send);
result = WriteFileEx (_hcom, // handle to output file
(void*)_write_data, // pointer to input buffer
send_buf_size, // number of bytes to write
(LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);
SleepEx(INFINITE, true); // Allow callback to come
if(result == false)
{
// error handling here
}
} // if(_is_open)
ReleaseMutex(_hmutex_port);
}
else /* nothing to send */
{
SetEvent(_hevent_write_done); // Skip write
}
}
void ThreadedComport::_receive_data (void)
{
WaitForSingleObject(_hmutex_port, INFINITE);
if(_is_open)
{
BOOL result;
_bytes_read = 0;
result = ReadFileEx (_hcom, // handle to output file
(void*)&_read_data, // pointer to input buffer
1, // number of bytes to read
(OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);
SleepEx(INFINITE, true); // Allow callback to come
if(result == FALSE)
{
DWORD last_error = GetLastError();
if(last_error == ERROR_OPERATION_ABORTED) // disconnected ?
{
close(); // close the port
}
}
}
ReleaseMutex(_hmutex_port);
}
DWORD ThreadedComport::_wait_for_io (void)
{
DWORD result;
bool is_write_done = false;
bool is_read_done = false;
HANDLE handle_array [5] =
{
_hevent_kill_thread,
_hevent_open,
_hevent_close,
_hevent_write_done,
_hevent_read_done
};
do /* COM port message pump running until sending / receiving is done */
{
result = WaitForMultipleObjects(5,
handle_array,
false, /* dont wait for all */
INFINITE);
if(result <= WAIT_OBJECT_2)
{
break; /* abort */
}
else if(result == WAIT_OBJECT_3) /* write done */
{
is_write_done = true;
SetEvent(_hevent_ext_send);
}
else if(result == WAIT_OBJECT_4) /* read done */
{
is_read_done = true;
if(_bytes_read > 0)
{
uint32 errors = 0;
WaitForSingleObject(_hmutex_rec, INFINITE);
_rec_buf.push_back((uint8)_read_data);
_data_received += _bytes_read;
while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
{
_rec_buf.pop_front();
}
ReleaseMutex(_hmutex_rec);
_bytes_read = 0;
ClearCommError(_hcom, &errors, NULL);
SetEvent(_hevent_ext_receive);
}
}
} while(!is_write_done || !is_read_done);
return result;
}
异步 I/O 回调函数:
void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
_this->_data_sent = dwNumberOfBytesTransfered;
}
}
delete [] _this->_write_data; /* always clean this up */
SetEvent(lpOverlapped->hEvent);
}
void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
_this->_bytes_read = dwNumberOfBytesTransfered;
}
}
SetEvent(lpOverlapped->hEvent);
}
I'm writing a serial port software for Windows. To improve performance I'm trying to convert the routines to use asynchronous I/O. I have the code up and working fairly well, but I'm a semi-beginner at this, and I would like to improve the performance of the program further. During stress tests of the program (ie burst data to/from the port as fast as possible at high baudrate), the CPU load gets quite high.
If anyone out there has experience from asynchronous I/O and multi-threading in Windows, I'd be grateful if you could take a look at my program. I have two main concerns:
Is the asynchronous I/O implemented correctly? I found some fairly reliable source on the net suggesting that you can pass user data to the callback functions, by implementing your own OVERLAPPED struct with your own data at the end. This seems to be working just fine, but it does look a bit "hackish" to me. Also, the program's performance didn't improve all that much when I converted from synchronous/polled to asynchronous/callback, making me suspect I'm doing something wrong.
Is it sane to use STL std::deque for the FIFO data buffers? As the program is currently written, I only allow 1 byte of data to be received at a time, before it must be processed. Because I don't know how much data I will receive, it could be endless amounts. I assume this 1-byte-at-a-time will yield sluggish behaviour behind the lines of deque when it has to allocate data. And I don't trust deque to be thread-safe either (should I?).
If using STL deque isn't sane, are there any suggestions for a better data type to use? Static array-based circular ring buffer?
Any other feedback on the code is most welcome as well.
The serial routines are implemented so that I have a parent class called "Comport", which handles everything serial I/O related. From this class I inherit another class called "ThreadedComport", which is a multi-threaded version.
ThreadedComport class (relevant parts of it)
class ThreadedComport : public Comport
{
private:
HANDLE _hthread_port; /* thread handle */
HANDLE _hmutex_port; /* COM port access */
HANDLE _hmutex_send; /* send buffer access */
HANDLE _hmutex_rec; /* rec buffer access */
deque<uint8> _send_buf;
deque<uint8> _rec_buf;
uint16 _data_sent;
uint16 _data_received;
HANDLE _hevent_kill_thread;
HANDLE _hevent_open;
HANDLE _hevent_close;
HANDLE _hevent_write_done;
HANDLE _hevent_read_done;
HANDLE _hevent_ext_send; /* notifies external thread */
HANDLE _hevent_ext_receive; /* notifies external thread */
typedef struct
{
OVERLAPPED overlapped;
ThreadedComport* caller; /* add user data to struct */
} OVERLAPPED_overlap;
OVERLAPPED_overlap _send_overlapped;
OVERLAPPED_overlap _rec_overlapped;
uint8* _write_data;
uint8 _read_data;
DWORD _bytes_read;
static DWORD WINAPI _tranceiver_thread (LPVOID param);
void _send_data (void);
void _receive_data (void);
DWORD _wait_for_io (void);
static void WINAPI _send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);
static void WINAPI _receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped);
};
The main thread routine created through CreateThread():
DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
ThreadedComport* caller = (ThreadedComport*) param;
HANDLE handle_array [3] =
{
caller->_hevent_kill_thread, /* WAIT_OBJECT_0 */
caller->_hevent_open, /* WAIT_OBJECT_1 */
caller->_hevent_close /* WAIT_OBJECT_2 */
};
DWORD result;
do
{
/* wait for anything to happen */
result = WaitForMultipleObjects(3,
handle_array,
false, /* dont wait for all */
INFINITE);
if(result == WAIT_OBJECT_1 ) /* open? */
{
do /* while port is open, work */
{
caller->_send_data();
caller->_receive_data();
result = caller->_wait_for_io(); /* will wait for the same 3 as in handle_array above,
plus all read/write specific events */
} while (result != WAIT_OBJECT_0 && /* while not kill thread */
result != WAIT_OBJECT_2); /* while not close port */
}
else if(result == WAIT_OBJECT_2) /* close? */
{
; /* do nothing */
}
} while (result != WAIT_OBJECT_0); /* kill thread? */
return 0;
}
which in turn calls the following three functions:
void ThreadedComport::_send_data (void)
{
uint32 send_buf_size;
if(_send_buf.size() != 0) // anything to send?
{
WaitForSingleObject(_hmutex_port, INFINITE);
if(_is_open) // double-check port
{
bool result;
WaitForSingleObject(_hmutex_send, INFINITE);
_data_sent = 0;
send_buf_size = _send_buf.size();
if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
{
send_buf_size = _MAX_MESSAGE_LENGTH;
}
_write_data = new uint8 [send_buf_size];
for(uint32 i=0; i<send_buf_size; i++)
{
_write_data[i] = _send_buf.front();
_send_buf.pop_front();
}
_send_buf.clear();
ReleaseMutex(_hmutex_send);
result = WriteFileEx (_hcom, // handle to output file
(void*)_write_data, // pointer to input buffer
send_buf_size, // number of bytes to write
(LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);
SleepEx(INFINITE, true); // Allow callback to come
if(result == false)
{
// error handling here
}
} // if(_is_open)
ReleaseMutex(_hmutex_port);
}
else /* nothing to send */
{
SetEvent(_hevent_write_done); // Skip write
}
}
void ThreadedComport::_receive_data (void)
{
WaitForSingleObject(_hmutex_port, INFINITE);
if(_is_open)
{
BOOL result;
_bytes_read = 0;
result = ReadFileEx (_hcom, // handle to output file
(void*)&_read_data, // pointer to input buffer
1, // number of bytes to read
(OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
(LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);
SleepEx(INFINITE, true); // Allow callback to come
if(result == FALSE)
{
DWORD last_error = GetLastError();
if(last_error == ERROR_OPERATION_ABORTED) // disconnected ?
{
close(); // close the port
}
}
}
ReleaseMutex(_hmutex_port);
}
DWORD ThreadedComport::_wait_for_io (void)
{
DWORD result;
bool is_write_done = false;
bool is_read_done = false;
HANDLE handle_array [5] =
{
_hevent_kill_thread,
_hevent_open,
_hevent_close,
_hevent_write_done,
_hevent_read_done
};
do /* COM port message pump running until sending / receiving is done */
{
result = WaitForMultipleObjects(5,
handle_array,
false, /* dont wait for all */
INFINITE);
if(result <= WAIT_OBJECT_2)
{
break; /* abort */
}
else if(result == WAIT_OBJECT_3) /* write done */
{
is_write_done = true;
SetEvent(_hevent_ext_send);
}
else if(result == WAIT_OBJECT_4) /* read done */
{
is_read_done = true;
if(_bytes_read > 0)
{
uint32 errors = 0;
WaitForSingleObject(_hmutex_rec, INFINITE);
_rec_buf.push_back((uint8)_read_data);
_data_received += _bytes_read;
while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
{
_rec_buf.pop_front();
}
ReleaseMutex(_hmutex_rec);
_bytes_read = 0;
ClearCommError(_hcom, &errors, NULL);
SetEvent(_hevent_ext_receive);
}
}
} while(!is_write_done || !is_read_done);
return result;
}
Asynchronous I/O callback functions:
void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
_this->_data_sent = dwNumberOfBytesTransfered;
}
}
delete [] _this->_write_data; /* always clean this up */
SetEvent(lpOverlapped->hEvent);
}
void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped)
{
if(dwErrorCode == 0) // no errors
{
if(dwNumberOfBytesTransfered > 0)
{
ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
_this->_bytes_read = dwNumberOfBytesTransfered;
}
}
SetEvent(lpOverlapped->hEvent);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
第一个问题很简单。这个方法并不黑客;您拥有
OVERLAPPED
内存及其后面的所有内容。 Raymond Chen 对此进行了最好的描述: http://blogs .msdn.com/b/oldnewthing/archive/2010/12/17/10106259.aspx只有在等待 I/O 完成时有更好的事情要做,您才会期望性能得到改进。如果您所做的只是
SleepEx
,您只会看到 CPU% 下降。线索就在名称“重叠”中——它允许您重叠计算和 I/O。std::deque
可以处理 FIFO 数据,没有大问题。它可能会回收 4KB 块(通过广泛的分析确定的精确数字,全部为您完成)。[编辑]
我进一步研究了您的代码,看起来代码不必要地复杂。对于初学者来说,异步 I/O 的主要好处之一是您不需要所有线程的东西。线程允许您使用更多内核,但您正在处理缓慢的 I/O 设备。即使是单个核心也足够了,如果它不花费所有时间等待。这正是重叠 I/O 的用途。您只需将一个线程专用于端口的所有 I/O 工作。由于它是唯一的线程,因此不需要互斥体来访问该端口。
OTOH,您需要在
deque
对象周围有一个互斥体,因为生产者/消费者线程与 comport 线程不同。The first question is simple. The method is not hackish; you own the
OVERLAPPED
memory and everything that follows it. This is best described by Raymond Chen: http://blogs.msdn.com/b/oldnewthing/archive/2010/12/17/10106259.aspxYou would only expect a performance improvement if you've got better things to while waiting for the I/O to complete. If all you do is
SleepEx
, you'll only see CPU% go down. The clue is in the name "overlapped" - it allows you to overlap calculations and I/O.std::deque<unsigned char>
can handle FIFO data without big problems. It will probably recycle 4KB chunks (precise number determined by extensive profiling, all done for you).[edit]
I've looked into your code a bit further, and it seems the code is needlessly complex. For starters, one of the main benefits of asynchronous I/O is that you don't need all that thread stuff. Threads allow you to use more cores, but you're dealing with a slow I/O device. Even a single core is sufficient, if it doesn't spend all its time waiting. And that's precisely what overlapped I/O is for. You just dedicate one thread to all I/O work for the port. Since it's the only thread, it doesn't need a mutex to access that port.
OTOH, you would want a mutex around the
deque<uint8>
objects since the producer/consumer threads aren't the same as the comport thread.我不认为有任何理由在这样的项目中使用异步 I/O。当您处理大量套接字或在等待数据时有工作要做时,异步 I/O 很有用,但据我所知,您只处理单个套接字,并且在其间不执行任何工作。
另外,为了了解知识,您通常会使用 I/O 完成端口来处理异步 I/O。我不确定是否存在使用 I/O 完成端口对性能产生负面影响的情况。
但是,是的,您的异步 I/O 使用情况看起来不错。实现您自己的 OVERLAPPED 结构确实看起来像黑客,但它是正确的;没有其他方法可以将您自己的数据与完成相关联。
Boost 还有一个循环缓冲区实现,尽管我不确定它是否是线程安全的。不过,标准库容器都不是线程安全的。
I don't see any reason for using asynchronous I/O in a project like this. Asynchronous I/O is good when you're handling a large number of sockets or have work to do while waiting for data, but as far as I can tell, you're only dealing with a single socket and not doing any work in between.
Also, just for the sake of knowledge, you would normally use an I/O completion port to handle your asynchronous I/O. I'm not sure if there are any situations where using an I/O completion port has a negative impact on performance.
But yes, your asynchronous I/O usage looks okay. Implementing your own
OVERLAPPED
struct does look like a hack, but it is correct; there's no other way to associate your own data with the completion.Boost also has a circular buffer implementation, though I'm not sure if it's thread safe. None of the standard library containers are thread safe, though.
我认为您的代码设计欠佳。
我猜你正在与太多线程共享太多数据结构。我认为您应该将一个端口的串行设备 IO 的所有处理放入单个线程中,并在 IO 线程和所有客户端线程之间放置一个同步命令/数据队列。让 IO 线程留意队列中的命令/数据。
您似乎正在为每个发送的事件分配和释放一些缓冲区。避免这种情况。如果将所有 IO 保留在单个线程中,则可以重用单个缓冲区。无论如何,您都在限制消息的大小,您只需预先分配一个足够大的缓冲区即可。
将要发送的字节放入
std::deque
并不是最佳选择。您必须将它们序列化为WriteFile()
的连续内存块。相反,如果您在一个 IO 线程和其他线程之间使用某种命令和数据队列,则可以让客户端线程立即提供连续的内存块。将
一次读取 1 个字节似乎也很愚蠢。除非它不适用于串行设备,否则您可以为
ReadFileEx()
提供足够大的缓冲区。它返回实际已读取的字节数。它不应该阻塞,AFAIK,除非我当然错了。您正在等待重叠 IO 使用
SleepEx()
调用完成。如果您刚刚结束同步,那么重叠 IO 的意义何在?I think that your code has suboptimal design.
You are sharing too many data structures with too many threads, I guess. I think that you should put all handling of the serial device IO for one port into a single thread and put a synchronized command/data queue between the IO thread and all client threads. Have the IO thread watch out for commands/data in the queue.
You seem to be allocating and freeing some buffers for each sent event. Avoid that. If you keep all the IO in a single thread, you can reuse a single buffer. You are limiting the size of the message anyway, you can just pre-allocate a single big enough buffer.
Putting the bytes that you want to send into a
std::deque
is suboptimal. You have to serialize them into a continuous memory block for theWriteFile()
. Instead, if you use some sort of commdand/data queue between one IO thread and other threads, you can have the client threads provide the continuous chunk of memory at once.Reading 1 byte at a time seem silly, too. Unless it does not work for serial devices, you could provide large enough buffer to
ReadFileEx()
. It returns how many bytes it has actually managed to read. It should not block, AFAIK, unless of course I am wrong.You are waiting for the overlapped IO to finish using the
SleepEx()
invocation. What is the point of the overlapped IO then if you are just ending up being synchronous?