有没有并发无锁阻塞队列的实现?

发布于 2024-11-02 16:53:29 字数 3410 浏览 5 评论 0原文

我知道阻塞队列和无锁队列,这是 Scott 等人,但是是否有无锁阻塞队列的实现?

在无锁阻塞队列中,出队不需要锁定,但如果队列中没有项目,它将阻塞消费者。有这样一个野兽的实现吗?我更喜欢它们是 C# 实现,但任何实现在技术上都可以工作。

更新:

我认为我最终在 D14.1 行上遇到了竞争条件:

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–>Head // Read Head
D3:         tail = Q–>Tail // Read Tail
D4:         next = head–>next // Read Head.ptr–>next
D5:         if head == Q–>Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–>value
D13:                if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded

I'm aware of blocking-queues and lock-free queues, a great example of those implementations being provided by Scott et al., but are there any implementations of a lock-free-blocking-queue?

In a lock-free-blocking-queue the dequeue will require no locking, but if there are no items in the queue it will block the consumer. Are there any implementations of such a beast? I prefer if they're C# implementations, but any implementation would technically work.

Update:

I think I end up with a race condition on line D14.1:

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–>Head // Read Head
D3:         tail = Q–>Tail // Read Tail
D4:         next = head–>next // Read Head.ptr–>next
D5:         if head == Q–>Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–>value
D13:                if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

我不在是我 2024-11-09 16:53:30

编辑:

更简单:
我建议你的队列不需要头和尾。只要有一个头。如果 head = NULL,则列表为空。将项目添加到头部。从头上取下物品。更简单、更少的 CAS 操作。

帮助者:
我在评论中建议您需要考虑一个辅助方案来处理比赛。在我的“无锁”含义版本中,如果罕见的竞争条件不会引起问题,那么它们是可以接受的。我喜欢额外的性能,而不是让空闲线程休眠几毫秒太久。

帮手想法。当消费者抓取工作时,它可以检查是否有线程处于昏迷状态。当生产者添加工作时,它可以在逗号中查找线程。

所以跟踪睡眠者。使用枕木的链接列表。当一个线程决定没有工作时,它会将自己标记为 !awake,并且 CAS 将自己标记为 sleeper 列表的头部。当接收到唤醒信号时,线程将自身标记为唤醒。然后新唤醒的线程会清理sleeper列表。要清理并发单链表,你必须小心。你只能CAS到头部。因此,当睡眠者列表的头部被标记为唤醒时,您可以 CAS 关闭该头部。如果头部没有醒来,则继续扫描列表并“延迟取消链接”(我创造了这个术语)剩余的唤醒项目。延迟取消链接很简单...只需将上一个项目的下一个指针设置在唤醒项目上即可。即使并发扫描到达了!唤醒的项目,它仍然会到达列表的末尾。后续扫描会看到较短的列表。最后,每当您添加工作或完成工作时,请扫描睡眠列表以查找!唤醒项目。如果消费者在获取一些工作后发现工作仍然存在(.next work!= NULL),则消费者可以扫描休眠列表并向第一个唤醒的线程发出信号。生产者添加工作后,生产者可以扫描睡眠者列表并执行相同的操作。

如果您有广播场景并且无法向单个线程发出信号,那么只需保留休眠线程的计数即可。虽然该计数仍然> 0 中,消费者注意到剩余工作和添加工作的消费者将广播唤醒信号。

在我们的环境中,每个 SMT 有 1 个线程,因此休眠列表永远不会那么大(除非我得到这些新的 128 个并发线程机器之一!)我们在事务的早期生成工作项。在第一秒内,我们可能会生成 10,000 个工作项,并且这种生成会迅速减少。线程在这些工作项上工作几秒钟。因此,我们很少有空闲池上的线程。

你仍然可以使用锁
如果您只有 1 个线程并且很少生成工作...这对您不起作用。在这种情况下,互斥体的性能并不重要,您应该只使用它们。在这种情况下,请在睡眠队列上使用锁。将无锁视为“重要的地方没有锁”。

上一篇文章:
你是说:
有一个工作队列。
有很多消费者线程。
消费者需要拉动工作,如果有工作就去做
消费者线程需要休眠直到有工作为止。

如果是的话,我们只使用原子操作来完成此操作:

工作队列是一个链接列表。还有一个休眠线程的链表。

添加工作: CAS 将列表头添加到新工作中。当添加工作时,我们检查睡眠列表中是否有任何线程。如果有,在添加工作之前,我们将一个睡眠者从睡眠者列表中删除,设置其工作=新工作,然后向睡眠者发出唤醒信号。我们将工作添加到工作队列中。

要消耗工作:将列表的头部CAS到head->next。如果工作列表的头为 NULL,我们将线程 CAS 到睡眠者列表。

一旦线程拥有工作项,该线程必须将工作项的状态 CAS 为 WORK_INPROGRESS 或类似状态。如果失败,则意味着该工作正在由另一个线程执行,因此使用者线程将返回以搜索工作。如果一个线程醒来并有一个工作项,它仍然必须 CAS 状态。

因此,如果添加工作,熟睡的消费者总是会被唤醒并交给工作。 pthread_kill() 总是在 sigwait() 处唤醒线程,因为即使线程在信号之后到达 sigwait,也会收到信号。这解决了线程将自己放入休眠列表但在进入休眠之前收到信号的问题。所发生的一切是线程试图拥有它的 -> 工作(如果有的话)。无法拥有工作或没有工作会将线程发送回消耗启动状态。如果一个线程未能 CAS 到 sleeper 列表,则意味着另一个线程击败了它,或者生产者拉下了 sleeper。为了安全起见,我们让线程表现得就像刚刚被唤醒一样。

我们这样做没有竞争条件,并且有多个生产者和消费者。我们还能够对此进行扩展,以允许线程在各个工作项上休眠。

EDIT:

SIMPLER:
I suggest you don't need a head and tail for your queue. Just have a head. If the head = NULL, the list is empty. Add items to head. Remove items from head. Simpler, fewer CAS ops.

HELPER:
I suggested in the comments that you need to think of a helper scheme to handle the race. In my version of what "lock free" means, it's ok to have rare race conditions if they don't cause problems. I like the extra performance vs having an idle thread sleep a couple ms too long.

Helper ideas. When a consumer grabs work it could check to see if there is a thread in a coma. When a producer adds work, it could look for threads in comas.

So track sleepers. Use a linked list of sleepers. When a thread decides there is no work, it marks itself as !awake and CAS's itself to head of the sleeper list. When a signal is received to wake up, the thread marks self as awake. Then the newly awakened thread cleans up the sleeper list. To clean up a concurrent single linked list, you have to be careful. You can only CAS to the head. So while the head of the sleeper list is marked awake, you can CAS the head off. If the head is not awake, continue to scan the list and "lazy unlink" (I made that term up) the remaining awake items. Lazy unlink is simple...just set next ptr of prev item over the awake item. A concurrent scan will still make it to the end of the list even if it gets to items that are !awake. Subsequent scans see a shorter list. Finally, any time you add work or pull off work, scan the sleeper list for !awake items. If a consumer notices work remains after grabbing some work (.next work != NULL), the consumer can scan sleeper list and signal the first thread that is !awake. After a producer adds work, the producer can scan the sleeper list and do the same.

If you have a broadcast scenario and cant signal a single thread, then just keep a count of asleep threads. While that count is still > 0, a consumer noticing remaining work and a consumer adding work would broadcast the signal to wake up.

In our environment, we have 1 thread per SMT, so the sleeper list can never be that large (well unless I get my hands on one of those new 128 concurrent thread machines!) We generate work items early in a transaction. In the first sec we might generate 10,000 work items, and this production rapidly tapers off. Threads work for a couple sec on those work items. So, we rarely have a thread on the idle pool.

YOU CAN STILL USE LOCKS
If you have 1 thread only and generate work rarely...this wont work for you. In that case the performance of mutexes is of no concern and you should just use them. Use a lock on the sleeper queue in this scenario. Think of lock-free as being "no locks where it counts".

PREVIOUS POST:
Are you saying:
There is a queue of work.
There are many consumer threads.
A consumer needs to pull of work and do it if there is any work
A consumer thread needs to sleep until there is work.

If you are, we do this using only atomic operations this way:

The queue of work is a linked list. There is also a linked list of sleeping threads.

To add work: CAS the head of the list to the new work. When work is added,we check to see if there are any threads on the sleeper list. If there are, before adding the work, we CAS a sleeper off the sleeper list, set its work = the new work, and then signal the sleeper to wake up. The we add the work to the work queue.

To consume work: CAS the head of the list to head->next. If the head of the work list is NULL, we CAS the thread to a list of sleepers.

Once a thread has a work item, the thread must CAS the work item's state to WORK_INPROGRESS or some such. If that fails, it means the work is being performed by another, so the consumer thread goes back to search for work. If a thread wakes up and has a work item, it still has to CAS the state.

So if work is added, a sleeping consumer is always woken up and handed the work. pthread_kill() always wakes a thread at sigwait(), because even If the thread gets to sigwait after the signal, the signal is received. This solves the problem of a thread putting itself on the sleeper list but getting signaled before going to sleep. All that happens is the thread tries to own its ->work if there is one. Failure to own work or not having work sends the thread back to consume-start. If a thread fails to CAS to the sleeper list, it means that either another thread beat it, or that the producer pulled off a sleeper. For safety, we have the thread act as if it were just woken up.

We get no race conditions doing this and have multiple producers and consumers. We also have been able to expand this to allow threads to sleep on individual work items as well.

酒与心事 2024-11-09 16:53:30

.NET 并行扩展:(内置,适用于 .NET 4.0+):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx

StackOverflow 实现人员:

锁定 .net 中的自由构造


Response to clarification in comments:

如果空时的阻塞不忙(等待信号),那么似乎您需要一个计数信号量来等待。

另一种方法可以是使用常规队列,以及原子比较和交换或自旋锁来防止同时访问,
然后,如果消费者线程尝试在队列为空时进入,则锁定二进制信号量,
如果提供者线程在队列为空时尝试进入,则解锁二进制信号量以唤醒所有休眠消费者(并将它们返回到自旋锁,以便多个线程只有在队列中有足够的项目时才能进入)。

例如 // 伪代码

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}

.NET parallel extensions: (Built in, for .NET 4.0+):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx

Someone from StackOverflow's implementation:

Lock free constructs in .net


Response to clarification in comments:

If the blocking when empty is not busy (waits for signal), then it seems like you need a counting-semaphore to wait on.

An alternative approach could be using a regular queue, together with atomic compare and exchange or spin lock to prevent simultaneous access,
then if a consumer thread tries to enter when queue is empty, lock binary semaphore,
if a provider thread tries to enter when queue is empty, unlock binary semaphore to awaken all sleeper consumers (and return them to spin-lock, so that multiple threads can only enter if there are enough items in queue for them).

E.g. // pseudo code

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文