在 Linux 上等待多个条件变量而不进行不必要的睡眠?

发布于 2024-09-02 00:59:01 字数 657 浏览 5 评论 0原文

我正在编写一个对延迟敏感的应用程序,它实际上希望同时等待多个条件变量。我之前读过在 Linux 上获得此功能的几种方法(显然这是 Windows 上内置的),但它们似乎都不适合我的应用程序。我知道的方法是:

  1. 让一个线程等待您想要等待的每个条件变量,当它被唤醒时,将向您等待的单个条件变量发出信号。

  2. 通过定时等待循环访问多个条件变量。

  3. 将虚拟字节写入文件或管道,然后轮询它们。

#1 & #2 不适合,因为它们会导致不必要的睡眠。对于#1,您必须等待虚拟线程唤醒,然后向真实线程发出信号,然后让真实线程唤醒,而不是真正的线程只是唤醒开始 - 额外的调度程序量子花费在这实际上对我的应用程序很重要,我不想使用成熟的 RTOS。 #2 更糟糕的是,你可能会花费 N * 超时时间睡觉,或者你的超时时间将为 0,在这种情况下你永远不会睡觉(无休止地消耗 CPU 并使其他线程挨饿也很糟糕)。

对于#3,管道是有问题的,因为如果“发出信号”的线程繁忙甚至崩溃(实际上我正在处理单独的进程而不是线程——互斥体和条件将存储在共享内存中),那么写入线程将被卡住,因为管道的缓冲区将满,任何其他客户端也是如此。文件是有问题的,因为应用程序运行的时间越长,文件就会无休止地增长。

有更好的方法吗?也想知道适用于 Solaris 的答案。

I'm writing a latency sensitive app that in effect wants to wait on multiple condition variables at once. I've read before of several ways to get this functionality on Linux (apparently this is builtin on Windows), but none of them seem suitable for my app. The methods I know of are:

  1. Have one thread wait on each of the condition variables you want to wait on, which when woken will signal a single condition variable which you wait on instead.

  2. Cycling through multiple condition variables with a timed wait.

  3. Writing dummy bytes to files or pipes instead, and polling on those.

#1 & #2 are unsuitable because they cause unnecessary sleeping. With #1, you have to wait for the dummy thread to wake up, then signal the real thread, then for the real thread to wake up, instead of the real thread just waking up to begin with -- the extra scheduler quantum spent on this actually matters for my app, and I'd prefer not to have to use a full fledged RTOS. #2 is even worse, you potentially spend N * timeout time asleep, or your timeout will be 0 in which case you never sleep (endlessly burning CPU and starving other threads is also bad).

For #3, pipes are problematic because if the thread being 'signaled' is busy or even crashes (I'm in fact dealing with separate process rather than threads -- the mutexes and conditions would be stored in shared memory), then the writing thread will be stuck because the pipe's buffer will be full, as will any other clients. Files are problematic because you'd be growing it endlessly the longer the app ran.

Is there a better way to do this? Curious for answers appropriate for Solaris as well.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

蒗幽 2024-09-09 00:59:01

#3 选项(将虚拟字节写入文件或管道,然后轮询这些)在 Linux 上有更好的替代方案: eventfd

使用 eventfd,您将拥有一个内核内无符号 64 位计数器,而不是有限大小的缓冲区(如在管道中)或无限增长的缓冲区(如在文件中)。 8 字节write 向计数器添加一个数字; 8 字节读取要么将计数器清零并返回其先前的值(不带 EFD_SEMAPHORE),要么将计数器减 1 并返回 1(带 EFD_SEMAPHORE)代码>)。当计数器非零时,轮询函数(selectpollepoll)认为文件描述符是可读的。

即使计数器接近 64 位限制,如果您将文件描述符设置为非阻塞,write 也会失败并返回 EAGAIN。当计数器为零时,read 也会发生同样的情况。

Your #3 option (writing dummy bytes to files or pipes instead, and polling on those) has a better alternative on Linux: eventfd.

Instead of a limited-size buffer (as in a pipe) or an infinitely-growing buffer (as in a file), with eventfd you have an in-kernel unsigned 64-bit counter. An 8-byte write adds a number to the counter; an 8-byte read either zeroes the counter and returns its previous value (without EFD_SEMAPHORE), or decrements the counter by 1 and returns 1 (with EFD_SEMAPHORE). The file descriptor is considered readable to the polling functions (select, poll, epoll) when the counter is nonzero.

Even if the counter is near the 64-bit limit, the write will just fail with EAGAIN if you made the file descriptor non-blocking. The same happens with read when the counter is zero.

无语# 2024-09-09 00:59:01

如果您谈论的是 POSIX 线程,我建议使用单个条件变量和事件标志数量或类似的东西。这个想法是使用对等 condvar 互斥体来保护事件通知。无论如何,您需要在 cond_wait() 退出后检查事件。这是我的足够旧的代码,可以从我的培训中说明这一点(是的,我检查了它是否运行,但请注意它是在一段时间前准备好的,并且是为了新手而准备的)。

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

static pthread_cond_t var;
static pthread_mutex_t mtx;

unsigned event_flags = 0;
#define FLAG_EVENT_1    1
#define FLAG_EVENT_2    2

void signal_1()
{
    pthread_mutex_lock(&mtx);
    event_flags |= FLAG_EVENT_1;
    pthread_cond_signal(&var);
    pthread_mutex_unlock(&mtx);
}

void signal_2()
{
    pthread_mutex_lock(&mtx);
    event_flags |= FLAG_EVENT_2;
    pthread_cond_signal(&var);
    pthread_mutex_unlock(&mtx);
}

void* handler(void*)
{
    // Mutex is unlocked only when we wait or process received events.
    pthread_mutex_lock(&mtx);

    // Here should be race-condition prevention in real code.

    while(1)
    {
        if (event_flags)
        {
            unsigned copy = event_flags;

            // We unlock mutex while we are processing received events.
            pthread_mutex_unlock(&mtx);

            if (copy & FLAG_EVENT_1)
            {
                printf("EVENT 1\n");
                copy ^= FLAG_EVENT_1;
            }

            if (copy & FLAG_EVENT_2)
            {
                printf("EVENT 2\n");
                copy ^= FLAG_EVENT_2;

                // And let EVENT 2 to be 'quit' signal.
                // In this case for consistency we break with locked mutex.
                pthread_mutex_lock(&mtx);
                break;
            }

            // Note we should have mutex locked at the iteration end.
            pthread_mutex_lock(&mtx);
        }
        else
        {
            // Mutex is locked. It is unlocked while we are waiting.
            pthread_cond_wait(&var, &mtx);
            // Mutex is locked.
        }
    }

    // ... as we are dying.
    pthread_mutex_unlock(&mtx);
}

int main()
{
    pthread_mutex_init(&mtx, NULL);
    pthread_cond_init(&var, NULL);

    pthread_t id;
    pthread_create(&id, NULL, handler, NULL);
    sleep(1);

    signal_1();
    sleep(1);
    signal_1();
    sleep(1);
    signal_2();
    sleep(1);

    pthread_join(id, NULL);
    return 0;
}

If you are talking about POSIX threads I'd recommend to use single condition variable and number of event flags or something alike. The idea is to use peer condvar mutex to guard event notifications. You anyway need to check for event after cond_wait() exit. Here is my old enough code to illustrate this from my training (yes, I checked that it runs, but please note it was prepared some time ago and in a hurry for newcomers).

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

static pthread_cond_t var;
static pthread_mutex_t mtx;

unsigned event_flags = 0;
#define FLAG_EVENT_1    1
#define FLAG_EVENT_2    2

void signal_1()
{
    pthread_mutex_lock(&mtx);
    event_flags |= FLAG_EVENT_1;
    pthread_cond_signal(&var);
    pthread_mutex_unlock(&mtx);
}

void signal_2()
{
    pthread_mutex_lock(&mtx);
    event_flags |= FLAG_EVENT_2;
    pthread_cond_signal(&var);
    pthread_mutex_unlock(&mtx);
}

void* handler(void*)
{
    // Mutex is unlocked only when we wait or process received events.
    pthread_mutex_lock(&mtx);

    // Here should be race-condition prevention in real code.

    while(1)
    {
        if (event_flags)
        {
            unsigned copy = event_flags;

            // We unlock mutex while we are processing received events.
            pthread_mutex_unlock(&mtx);

            if (copy & FLAG_EVENT_1)
            {
                printf("EVENT 1\n");
                copy ^= FLAG_EVENT_1;
            }

            if (copy & FLAG_EVENT_2)
            {
                printf("EVENT 2\n");
                copy ^= FLAG_EVENT_2;

                // And let EVENT 2 to be 'quit' signal.
                // In this case for consistency we break with locked mutex.
                pthread_mutex_lock(&mtx);
                break;
            }

            // Note we should have mutex locked at the iteration end.
            pthread_mutex_lock(&mtx);
        }
        else
        {
            // Mutex is locked. It is unlocked while we are waiting.
            pthread_cond_wait(&var, &mtx);
            // Mutex is locked.
        }
    }

    // ... as we are dying.
    pthread_mutex_unlock(&mtx);
}

int main()
{
    pthread_mutex_init(&mtx, NULL);
    pthread_cond_init(&var, NULL);

    pthread_t id;
    pthread_create(&id, NULL, handler, NULL);
    sleep(1);

    signal_1();
    sleep(1);
    signal_1();
    sleep(1);
    signal_2();
    sleep(1);

    pthread_join(id, NULL);
    return 0;
}
黑寡妇 2024-09-09 00:59:01

如果您希望在 POSIX 条件变量同步模型下获得最大的灵活性,则必须避免编写仅通过公开条件变量的方式向用户传达事件的模块。 (这样您实际上就重新发明了一个信号量。)

活动模块的设计应使其接口通过注册的函数提供事件的回调通知:并且,如有必要,可以注册多个回调。

多个模块的客户端向每个模块注册一个回调。这些都可以被路由到一个公共位置,在那里它们锁定相同的互斥锁,更改某些状态,解锁并命中相同的条件变量。

这种设计还提供了一种可能性,如果响应事件所做的工作量相当小,也许可以在回调的上下文中完成。

回调在调试方面也有一些优势。您可以在以回调形式到达的事件上放置断点,并查看其生成方式的调用堆栈。如果您在作为信号量唤醒或通过某种消息传递机制到达的事件上放置断点,则调用跟踪不会揭示事件的起源。


话虽这么说,您可以使用互斥体和条件变量创建自己的同步原语,以支持等待多个对象。这些同步原语可以在内部基于回调,以对应用程序的其余部分不可见的方式。

其要点是,对于线程想要等待的每个对象,等待操作都会将该对象的回调接口排队。当一个对象收到信号时,它会调用所有已注册的回调。唤醒的线程使所有回调接口出队,并查看每个回调接口中的一些状态标志以查看哪些对象发出了信号。

If you want maximum flexibility under the POSIX condition variable model of synchronization, you must avoid writing modules which communicate events to their users only by means of exposing a condition variable. (You have then essentially reinvented a semaphore.)

Active modules should be designed such that their interfaces provide callback notifications of events, via registered functions: and, if necessary, such that multiple callbacks can be registered.

A client of multiple modules registers a callback with each of them. These can all be routed into a common place where they lock the same mutex, change some state, unlock, and hit the same condition variable.

This design also offers the possibility that, if the amount of work done in response to an event is reasonably small, perhaps it can just be done in the context of the callback.

Callbacks also have some advantages in debugging. You can put a breakpoint on an event which arrives in the form of a callback, and see the call stack of how it was generated. If you put a breakpoint on an event that arrives as a semaphore wakeup, or via some message passing mechanism, the call trace doesn't reveal the origin of the event.


That being said, you can make your own synchronization primitives with mutexes and condition variables which support waiting on multiple objects. These synchronization primitives can be internally based on callbacks, in a way that is invisible to the rest of the application.

The gist of it is that for each object that a thread wants to wait on, the wait operation queues a callback interface with that object. When an object is signaled, it invokes all of its registered callbacks. The woken threads dequeue all the callback interfaces, and peek at some status flags in each one to see which objects signaled.

若能看破又如何 2024-09-09 00:59:01

为了等待多个条件变量,Solaris 有一个实现,如果您有兴趣,可以将其移植到 Linux:等待 API

For waiting on multiple condition variables, there is an implementation for Solaris that you could port to Linux if you're interested: WaitFor API

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文