如何实现无锁但阻塞的行为?

发布于 2024-11-09 00:33:49 字数 195 浏览 0 评论 0原文

我正在为密集型网络应用程序实现一个无锁的单生产者单消费者队列。我有一堆工作线程在它们自己的单独队列中接收工作,然后它们将其出队并进行处理。

去掉这些队列的锁,大大提高了高负载下的性能,但是当队列为空时它们不再阻塞,进而导致CPU使用率飙升。

如何有效地使线程阻塞,直到它可以成功地将某些内容出队或被杀死/中断?

I'm implementing a lock-free single producer single consumer queue for an intensive network application. I have a bunch of worker threads receiving work in their own separate queues, which they then dequeue and process.

Removing the locks from these queues have greatly improved the performance under high load, but they no longer block when the queues are empty, which in turn causes the CPU usage to skyrocket.

How can I efficiently cause a thread to block until it can successfully dequeue something or is killed/interrupted?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

泪眸﹌ 2024-11-16 00:33:49

如果您使用的是 Linux,请考虑使用 Futex。它通过使用原子操作而不是像互斥体那样的内核调用来提供非锁定实现的性能,但是如果您由于某些条件不成立(即锁争用)而需要将进程设置为空闲,它将然后进行适当的内核调用以使进程进入睡眠状态并在将来发生事件时将其唤醒。它基本上就像一个非常快的信号量。

If you're on Linux, look into using a Futex. It provides the performance of a non-locking implementation by using atomic operations rather than kernel calls like a mutex would, but should you need to set the process to idle because of some condition not being true (i.e., lock-contention), it will then make the appropriate kernel calls to put the process to sleep and wake it back up at a future event. It's basically like a very fast semaphore.

感悟人生的甜 2024-11-16 00:33:49

在 Linux 上,futex 可用于阻止线程。但请注意,Futex 很棘手

更新:条件变量比 futex 使用起来更安全,并且更便携。但是,条件变量与互斥体结合使用,因此严格来说,结果将不再是无锁的。然而,如果你的主要目标是性能(而不是全局进度的保证),并且锁定部分(即线程唤醒后检查的条件)很小,那么你可能会得到满意的结果,而无需进入将 futexes 集成到算法中的微妙之处。

On Linux, futex can be used to block a thread. But be aware that Futexes Are Tricky!

UPDATE: condition variables are much safer to use than futexes, and are more portable. However, a condition variable is used in combination with a mutex, so strictly speaking the result will not be lock-free anymore. However, if your primary goal is performance (and not the guaranty of global progress), and the locked portion (i.e. a condition to check after thread wakeup) is small, it might happen that you will get satisfactory results without the need to go into subtleties of integrating futexes into the algorithm.

逆夏时光 2024-11-16 00:33:49

如果您使用的是 Windows,则无法使用 futexes,但 Windows Vista 有一个类似的机制,称为 Keyed事件。不幸的是,这不是已发布的 API 的一部分(它是 NTDLL 本机 API),但只要您接受它可能在未来版本的 Windows 中更改的警告(并且您不需要在Vista 之前的内核)。请务必阅读我上面链接的文章。以下是其工作原理的未经测试草图:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

您还可以使用类似的协议 Slim 读写 锁和 条件变量,具有无锁快速路径。这些是键控事件的包装器,因此它们可能比直接使用键控事件产生更多的开销。

If you're on Windows, you won't be able to use futexes, but Windows Vista has a similar mechanism called Keyed Events. Unfortunately, this isn't part of the published API (it's an NTDLL native API), but you can use it as long as you accept the caveat that it might change in future versions of Windows (and you don't need to run on pre-Vista kernels). Be sure to read the article I linked above. Here's an untested sketch of how it might work:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it's racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

You could also use a similar protocol using Slim Read Write locks and Condition Variables, with a lockless fast path. These are wrappers over keyed events, so they may incur more overhead than using keyed events directly.

予囚 2024-11-16 00:33:49

您尝试过条件等待吗?当队列变空时,就开始等待新的作业。将作业放入队列的线程应该触发该信号。这样,只有当队列为空时才使用锁。

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

Have you tried conditional waiting? When the queue becomes empty, just start waiting for a new job. The thread putting jobs in the queue should fire the signal. This way you only use locks when the queue is empty.

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

一袭白衣梦中忆 2024-11-16 00:33:49

您可以使用 sigwait() 函数使线程休眠。您可以使用 pthread_kill 唤醒线程。这比条件变量快得多。

You can cause a thread to sleep by using the sigwait() function. You can wake the thread with pthread_kill. This is much faster than condition variables.

小矜持 2024-11-16 00:33:49

您可以在等待时添加睡眠。只需选择您愿意等待的最长等待时间,然后执行类似以下操作(伪代码,因为我不记得 pthread 语法):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

即使像 100 毫秒睡眠这样短暂的操作也应该显着降低 CPU 使用率。我不确定上下文切换在什么时候会比忙于等待更糟糕。

You could add sleeps while it's waiting. Just pick the biggest wait you're willing to have, then do something like this (pseudocode because I don't remember pthread syntax):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

Even something short like a 100 ms sleep should significantly lower the CPU usage. I'm not sure at what point the context switching will make it worse than busy waiting though.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文