C - 如何同时使用 aio_read() 和 aio_write()
我实现了需要读写的游戏服务器。因此,我接受传入连接并开始使用 aio_read() 读取连接,但是当我需要发送某些内容时,我停止使用 aio_cancel() 读取,然后使用 aio_write ()。在 write 的回调中我继续阅读。所以,我确实一直在阅读,但当我需要发送一些东西时 - 我暂停阅读。
它的工作时间约为 20% - 在其他情况下,对 aio_cancel() 的调用失败,并显示“操作正在进行中” - 并且我无法取消它(即使在永久 while 内)循环)。所以,我添加的写操作永远不会发生。
如何用好这些功能呢?我错过了什么?
编辑: 在Linux 2.6.35下使用。 Ubuntu 10 - 32 位。
示例代码:
void handle_read(union sigval sigev_value) { /* handle data or disconnection */ }
void handle_write(union sigval sigev_value) { /* free writing buffer memory */ }
void start()
{
const int acceptorSocket = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
bind(acceptorSocket, (struct sockaddr*)&addr, sizeof(struct sockaddr_in));
listen(acceptorSocket, SOMAXCONN);
struct sockaddr_in address;
socklen_t addressLen = sizeof(struct sockaddr_in);
for(;;)
{
const int incomingSocket = accept(acceptorSocket, (struct sockaddr*)&address, &addressLen);
if(incomingSocket == -1)
{ /* handle error ... */}
else
{
//say socket to append outcoming messages at writing:
const int currentFlags = fcntl(incomingSocket, F_GETFL, 0);
if(currentFlags < 0) { /* handle error ... */ }
if(fcntl(incomingSocket, F_SETFL, currentFlags | O_APPEND) == -1) { /* handle another error ... */ }
//start reading:
struct aiocb* readingAiocb = new struct aiocb;
memset(readingAiocb, 0, sizeof(struct aiocb));
readingAiocb->aio_nbytes = MY_SOME_BUFFER_SIZE;
readingAiocb->aio_fildes = socketDesc;
readingAiocb->aio_buf = mySomeReadBuffer;
readingAiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
readingAiocb->aio_sigevent.sigev_value.sival_ptr = (void*)mySomeData;
readingAiocb->aio_sigevent.sigev_notify_function = handle_read;
if(aio_read(readingAiocb) != 0) { /* handle error ... */ }
}
}
}
//called at any time from server side:
send(void* data, const size_t dataLength)
{
//... some thread-safety precautions not needed here ...
const int cancellingResult = aio_cancel(socketDesc, readingAiocb);
if(cancellingResult != AIO_CANCELED)
{
//this one happens ~80% of the time - embracing previous call to permanent while cycle does not help:
if(cancellingResult == AIO_NOTCANCELED)
{
puts(strerror(aio_return(readingAiocb))); // "Operation now in progress"
/* don't know what to do... */
}
}
//otherwise it's okay to send:
else
{
aio_write(...);
}
}
I implement game server where I need to both read and write. So I accept incoming connection and start reading from it using aio_read() but when I need to send something, I stop reading using aio_cancel() and then use aio_write(). Within write's callback I resume reading. So, I do read all the time but when I need to send something - I pause reading.
It works for ~20% of time - in other case call to aio_cancel() fails with "Operation now in progress" - and I cannot cancel it (even within permanent while cycle). So, my added write operation never happens.
How to use these functions well? What did I missed?
EDIT:
Used under Linux 2.6.35. Ubuntu 10 - 32 bit.
Example code:
void handle_read(union sigval sigev_value) { /* handle data or disconnection */ }
void handle_write(union sigval sigev_value) { /* free writing buffer memory */ }
void start()
{
const int acceptorSocket = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
bind(acceptorSocket, (struct sockaddr*)&addr, sizeof(struct sockaddr_in));
listen(acceptorSocket, SOMAXCONN);
struct sockaddr_in address;
socklen_t addressLen = sizeof(struct sockaddr_in);
for(;;)
{
const int incomingSocket = accept(acceptorSocket, (struct sockaddr*)&address, &addressLen);
if(incomingSocket == -1)
{ /* handle error ... */}
else
{
//say socket to append outcoming messages at writing:
const int currentFlags = fcntl(incomingSocket, F_GETFL, 0);
if(currentFlags < 0) { /* handle error ... */ }
if(fcntl(incomingSocket, F_SETFL, currentFlags | O_APPEND) == -1) { /* handle another error ... */ }
//start reading:
struct aiocb* readingAiocb = new struct aiocb;
memset(readingAiocb, 0, sizeof(struct aiocb));
readingAiocb->aio_nbytes = MY_SOME_BUFFER_SIZE;
readingAiocb->aio_fildes = socketDesc;
readingAiocb->aio_buf = mySomeReadBuffer;
readingAiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
readingAiocb->aio_sigevent.sigev_value.sival_ptr = (void*)mySomeData;
readingAiocb->aio_sigevent.sigev_notify_function = handle_read;
if(aio_read(readingAiocb) != 0) { /* handle error ... */ }
}
}
}
//called at any time from server side:
send(void* data, const size_t dataLength)
{
//... some thread-safety precautions not needed here ...
const int cancellingResult = aio_cancel(socketDesc, readingAiocb);
if(cancellingResult != AIO_CANCELED)
{
//this one happens ~80% of the time - embracing previous call to permanent while cycle does not help:
if(cancellingResult == AIO_NOTCANCELED)
{
puts(strerror(aio_return(readingAiocb))); // "Operation now in progress"
/* don't know what to do... */
}
}
//otherwise it's okay to send:
else
{
aio_write(...);
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
如果您希望有单独的 AIO 队列用于读取和写入,以便稍后发出的写入可以在较早发出的读取之前执行,那么您可以使用 dup() 创建套接字的副本,并且使用一个发出读取,另一个发出写入。
不过,我赞同完全避免 AIO 并简单地使用带有非阻塞套接字的
epoll()
驱动的事件循环的建议。这种技术已被证明可以扩展到大量客户端 - 如果您的 CPU 使用率很高,请分析它并找出发生这种情况的位置,因为很可能不是您的事件循环才是罪魁祸首。If you wish to have separate AIO queues for reads and writes, so that a write issued later can execute before a read issued earlier, then you can use
dup()
to create a duplicate of the socket, and use one to issue reads and the other to issue writes.However, I second the recommendations to avoid AIO entirely and simply use an
epoll()
-driven event loop with non-blocking sockets. This technique has been shown to scale to high numbers of clients - if you are getting high CPU usage, profile it and find out where that's happening, because the chances are that it's not your event loop that's the culprit.首先,考虑抛弃 aio。还有很多其他方法可以实现异步 I/O,但它们并不那么脑残(是的,aio 就是脑残)。多种选择;如果您使用的是 Linux,则可以使用 libaio (
io_submit
和朋友)。 aio(7) 提到了这一点。回到你的问题。
我已经很长时间没有使用
aio
了,但这是我记得的。aio_read
和aio_write
都将请求 (aiocb
) 放入某个队列中。即使请求会在一段时间后完成,它们也会立即返回。完全有可能对多个请求进行排队,而不关心前面的请求发生了什么。因此,简而言之:停止取消读取请求并继续添加它们。稍后您可以使用
aio_suspend
等待,使用aio_error
进行轮询,等待信号等。我看到您在评论中提到了
epoll
。 你绝对应该选择libaio
。First of all, consider dumping aio. There are lots of other ways to do asynchronous I/O that are not as braindead (yes, aio is breaindead). Lots of alternatives; if you're on linux you can use libaio (
io_submit
and friends). aio(7) mentions this.Back to your question.
I haven't used
aio
in a long time but here's what I remember.aio_read
andaio_write
both put requests (aiocb
) on some queue. They return immediately even if the requests will complete some time later. It's entirely possible to queue multiple requests without caring what happened to the earlier ones. So, in a nutshell: stop cancelling read requests and keep adding them.Later you're free to wait using
aio_suspend
, poll usingaio_error
, wait for signals etc.I see you mention
epoll
in your comment. You should definitely go forlibaio
.除非我没记错,否则 POSIX AIO(即 aio_read()、aio_write() 等)保证仅在可查找文件描述符上工作。来自 aio_read() 联机帮助页:
对于没有关联文件位置的设备(例如网络套接字、AFAICS、POSIX AIO)未定义。也许它恰好适用于您当前的设置,但这似乎更多是出于偶然,而不是设计。
此外,在 Linux 上,POSIX AIO 是在用户空间线程的帮助下在 glibc 中实现的。
也就是说,尽可能使用非阻塞 IO 和 epoll()。然而,epoll() 不适用于可查找的文件描述符,例如常规文件(对于经典的 select()/poll() 也是如此);在这种情况下,POSIX AIO 是滚动您自己的线程池的替代方案。
Unless I'm not mistaken, POSIX AIO (that is, aio_read(), aio_write() and so on) is guaranteed to work only on seekable file descriptors. From the aio_read() manpage:
For devices which do not have an associated file position such as network sockets, AFAICS, POSIX AIO is undefined. Perhaps it happens to work on your current setup, but that seems more by accident than by design.
Also, on Linux, POSIX AIO is implemented in glibc with the help of userspace threads.
That is, where possible use non-blocking IO and epoll(). However, epoll() does not work for seekable file descriptors such as regular files (same goes for the classical select()/poll() as well); in that case POSIX AIO is an alternative to rolling your own thread pool.
没有理由仅仅因为需要进行另一次读取或写入而停止或取消 aio 读取或写入请求。如果是这样的话,那将破坏异步读写的全部意义,因为它的主要目的是允许您设置读取或写入操作,然后继续。由于可以对多个请求进行排队,因此最好设置几个异步读取器/写入器池,您可以从“可用”池中获取一组预先初始化的
aiocb
结构。在需要时设置异步操作,然后在完成后将它们返回到另一个“已完成”池,并且您可以访问它们指向的缓冲区。当它们处于异步读取或写入过程中时,它们将处于“繁忙”池中并且不会被触及。这样,每次需要进行读取或写入操作时,您就不必在堆上动态创建aiocb
结构,尽管这样做没问题……只是效率不高,如果您永远不会计划超过一定的限制,或者计划只处理一定数量的“进行中”请求。顺便说一句,请记住,有几个不同的正在进行的异步请求,您的异步读/写处理程序实际上可能会被另一个读/写事件中断。所以你真的不想和你的处理者一起做很多事情。在我描述的上述场景中,您的处理程序基本上会将触发信号处理程序的 aiocb 结构从一个池移动到列出的“可用”->“忙”-> 中的下一个池。 “完成”阶段。从“完成”池中的 aiocb 结构指向的缓冲区读取数据后,您的主代码会将结构移回“可用”池。
There should be no reason to stop or cancel an aio read or write request just because you need to make another read or write. If that were the case, that would defeat the whole point of asynchronous reading and writing since it's main purpose is to allow you to setup a reading or writing operation, and then move on. Since multiple requests can be queued, it would be much better to setup a couple of asynchronous reader/writer pools where you can grab a set of pre-initialized
aiocb
structures from an "available" pool that have been setup for asynchronous operations whenever you need them, and then return them to another "finished" pool when they're done and you can access the buffers they point to. While they're in the middle of an asynchronous read or write, they would be in a "busy" pool and wouldn't be touched. That way you won't have to keep creatingaiocb
structures on the heap dynamically every time you need to make a read or write operation, although that's okay to-do ... it's just not very efficient if you never plan on going over a certain limit, or plan to have only a certain number of "in-flight" requests.BTW, keep in mind with a couple different in-flight asynchronous requests that your asychronous read/write handler can actually be interrupted by another read/write event. So you really don't want to be doing a whole-lot with your handler. In the above scenario I described, your handler would basically move the
aiocb
struct that triggered the signal handler from one of the pools to the next in the listed "available"->"busy"->"finished" stages. Your main code, after reading from the buffer pointed to by theaiocb
structures in the "finished" pool would then move the structure back to the "available" pool.