我可以在等待/信号信号量中切换测试和修改部分吗?

发布于 2024-11-04 08:37:55 字数 1241 浏览 0 评论 0原文

wait()signal() 信号量的经典 none-busy-waiting 版本实现如下。在此版本中,value 可以为负数。

//primitive
wait(semaphore* S)
{
    S->value--;
    if (S->value < 0)
    {
        add this process to S->list;
        block();
    }
}

//primitive
signal(semaphore* S)
{
    S->value++;
    if (S->value <= 0)
    {
        remove a process P from S->list;
        wakeup(P);
    }
}

问题:下面的版本也正确吗?这里我先测试一下,修改一下值。如果您能给我展示一个它不起作用的场景,那就太好了。

//primitive wait().
//If (S->value > 0), the whole function is atomic
//otherise, only if(){} section is atomic
wait(semaphore* S)
{
    if (S->value <= 0)
    {
        add this process to S->list;
        block();
    }
    // here I decrement the value after the previous test and possible blocking
    S->value--;
}

//similar to wait()
signal(semaphore* S)
{
    if (S->list is not empty)
    {
        remove a process P from S->list;
        wakeup(P);
    }
    // here I increment the value after the previous test and possible waking up
    S->value++;
}

编辑

我的动机是想弄清楚我是否可以使用后一个版本来实现互斥,并且没有死锁,没有饥饿。

The classic none-busy-waiting version of wait() and signal() semaphore are implemented as below. In this verson, value can be negative.

//primitive
wait(semaphore* S)
{
    S->value--;
    if (S->value < 0)
    {
        add this process to S->list;
        block();
    }
}

//primitive
signal(semaphore* S)
{
    S->value++;
    if (S->value <= 0)
    {
        remove a process P from S->list;
        wakeup(P);
    }
}

Question: Is the following version also correct? Here I test first and modify the value. It's great if you can show me a scenario where it doesn't work.

//primitive wait().
//If (S->value > 0), the whole function is atomic
//otherise, only if(){} section is atomic
wait(semaphore* S)
{
    if (S->value <= 0)
    {
        add this process to S->list;
        block();
    }
    // here I decrement the value after the previous test and possible blocking
    S->value--;
}

//similar to wait()
signal(semaphore* S)
{
    if (S->list is not empty)
    {
        remove a process P from S->list;
        wakeup(P);
    }
    // here I increment the value after the previous test and possible waking up
    S->value++;
}

Edit:

My motivation is to figure out whether I can use this latter version to achieve mutual exclusion, and no deadlock, no starvation.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

谁的年少不轻狂 2024-11-11 08:37:55

您的修改版本引入了竞争条件:

  • 线程 A: if(S->Value < 0) // Value = 1
  • 线程 B: if(S->Value < 0) // Value = 1
  • 线程 A: S -> 值--; // 值 = 0
  • 线程 B: S->Value--; // Value = -1

两个线程都获取了 count=1 信号量。哎呀。请注意,即使它们是不可抢占的(见下文),也存在另一个问题,但为了完整性,这里讨论原子性以及真正锁定协议的工作原理。

当使用这样的协议时,确定您正在使用的原子原语非常重要。原子原语看起来是即时执行的,而不与任何其他操作交织在一起。你不能只把一个大函数称为原子函数,而是将其称为原子函数。你必须使用其他原子基元以某种方式使其原子化。

大多数 CPU 都提供称为“原子比较和交换”的原语。从这里开始我将其缩写为 cmpxchg。语义如下:

bool cmpxchg(long *ptr, long old, long new) {
    if (*ptr == old) {
        *ptr = new;
        return true;
    } else {
        return false;
    }
}

cmpxchg 未使用此代码实现。它位于 CPU 硬件中,但行为有点像这样,只是原子地。

现在,让我们添加一些额外的有用函数(由其他原语构建):

  • add_waitqueue(waitqueue) - 将进程状态设置为睡眠并将我们添加到等待队列,但继续执行(原子)
  • Schedule() - 切换线程。如果我们处于睡眠状态,我们不会再次运行,直到被唤醒(阻塞)
  • remove_waitqueue(waitqueue) - 从等待队列中删除我们的进程,然后将我们的状态设置为唤醒(如果它还没有)(原子)
  • 内存_屏障( ) - 确保在此点之前的任何逻辑读/写实际上都在该点之前执行,避免令人讨厌的内存排序问题(我们假设所有其他原子原语都带有一个空闲的内存屏障,尽管这并不总是正确的)(CPU/编译器原语)

这是典型的信号量获取例程的外观。它比您的示例复杂一点,因为我已经明确确定了我正在使用的原子操作:

void sem_down(sem *pSem)
{
    while (1) {
        long spec_count = pSem->count;
        read_memory_barrier(); // make sure spec_count doesn't start changing on us! pSem->count may keep changing though
        if (spec_count > 0)
        {
            if (cmpxchg(&pSem->count, spec_count, spec_count - 1)) // ATOMIC
                return; // got the semaphore without blocking
            else
                continue; // count is stale, try again
        } else { // semaphore count is zero
            add_waitqueue(pSem->wqueue); // ATOMIC
            // recheck the semaphore count, now that we're in the waitqueue - it may have changed
            if (pSem->count == 0) schedule(); // NOT ATOMIC
            remove_waitqueue(pSem->wqueue); // ATOMIC
            // loop around again to try to acquire the semaphore
        }
    }
}

您会注意到非零 pSem->count 的实际测试,在现实世界的 semaphore_down 函数中,是由 cmpxchg 完成的。你不能相信任何其他读物;读取该值后,该值可能会立即发生变化。我们根本无法将值检查和值修改分开。

这里的spec_count推测。这很重要。我基本上是在猜测计数会是多少。这是一个很好的猜测,但它只是一个猜测。如果我的猜测错误,cmpxchg 将失败,此时例程必须循环并重试。如果我猜为 0,那么我要么会被叫醒(因为当我在等待队列上时它不再为零),要么我会注意到它在计划测试中不再为零。

您还应该注意,没有可能的方法使包含阻塞操作的函数成为原子的。这是荒谬的。根据定义,原子函数似乎是立即执行的,不与其他任何东西交错。但根据定义,阻塞函数会等待其他事情发生。这是不一致的。同样,任何原子操作都不能在阻塞操作中“拆分”,如您的示例所示。

现在,您可以通过声明函数不可抢占来消除很多这种复杂性。通过使用锁或其他方法,您只需确保信号量代码中一次只有一个线程运行(当然不包括阻塞)。但问题仍然存在。从值 0 开始,其中 C 已将信号量取下两次,然后:

  • 线程 A: if (S->Value < 0) // Value = 0
  • 线程 A: 阻止....
  • 线程 B: if ( S->Value < 0) // Value = 0
  • 线程 B: Block....
  • 线程 C: S->Value++ // value = 1
  • 线程 C: Wakeup(A)
  • (线程 C 调用 signal() (线程 C 调用wait
  • 线程 C: S->Value++ // value = 2
  • 线程 C: Wakeup(B)
  • ())
  • 线程 C: if (S->Value <= 0) // Value = 2
  • 线程 C: S->Value-- // Value = 1
  • // A 和 B 已被唤醒
  • 线程 A: S->Value-- // Value = 0
  • 线程 B: S->Value-- //值= -1

您可能可以通过循环重新检查 S->value 来修复此问题 - 假设您在单处理器计算机上并且您的信号量代码是可抢占的。不幸的是,这些假设在所有桌面操作系统上都是错误的:)

有关真实锁定协议如何工作的更多讨论,您可能会对论文“Fuss、Futexes 和 Furwocks:Linux 中的快速用户级锁定"

Your modified version introduces a race condition:

  • Thread A: if(S->Value < 0) // Value = 1
  • Thread B: if(S->Value < 0) // Value = 1
  • Thread A: S->Value--; // Value = 0
  • Thread B: S->Value--; // Value = -1

Both threads have acquired a count=1 semaphore. Oops. Note that there's another problem even if they're non-preemptible (see below), but for completeness, here's a discussion on atomicity and how real locking protocols work.

When working with protocols like this, it's very important to nail down exactly what atomic primitives you are using. Atomic primitives are such that they seem to execute instantaneously, without being interleaved with any other operations. You cannot just take a big function and call it atomic; you have to make it atomic somehow, using other atomic primitives.

Most CPUs offer a primitive called 'atomic compare and exchange'. I'll abbreviate it cmpxchg from here on. The semantics are like so:

bool cmpxchg(long *ptr, long old, long new) {
    if (*ptr == old) {
        *ptr = new;
        return true;
    } else {
        return false;
    }
}

cmpxchg is not implemented with this code. It is in the CPU hardware, but behaves a bit like this, only atomically.

Now, let's add to this some additional helpful functions (built out of other primitives):

  • add_waitqueue(waitqueue) - Sets our process state to sleeping and adds us to a wait queue, but continues executing (ATOMIC)
  • schedule() - Switch threads. If we're in a sleeping state, we don't run again until awakened (BLOCKING)
  • remove_waitqueue(waitqueue) - removes our process from a wait queue, then sets our state to awakened if it isn't already (ATOMIC)
  • memory_barrier() - ensures that any reads/writes logically before this point actually are performed before this point, avoiding nasty memory ordering issues (we'll assume all other atomic primitives come with a free memory barrier, although this isn't always true) (CPU/COMPILER PRIMITIVE)

Here's how a typical semaphore acquisition routine will look. It's a bit more complex than your example, because I've explicitly nailed down what atomic operations I'm using:

void sem_down(sem *pSem)
{
    while (1) {
        long spec_count = pSem->count;
        read_memory_barrier(); // make sure spec_count doesn't start changing on us! pSem->count may keep changing though
        if (spec_count > 0)
        {
            if (cmpxchg(&pSem->count, spec_count, spec_count - 1)) // ATOMIC
                return; // got the semaphore without blocking
            else
                continue; // count is stale, try again
        } else { // semaphore count is zero
            add_waitqueue(pSem->wqueue); // ATOMIC
            // recheck the semaphore count, now that we're in the waitqueue - it may have changed
            if (pSem->count == 0) schedule(); // NOT ATOMIC
            remove_waitqueue(pSem->wqueue); // ATOMIC
            // loop around again to try to acquire the semaphore
        }
    }
}

You'll note that the actual test for a non-zero pSem->count, in a real-world semaphore_down function, is accomplished by cmpxchg. You can't trust any other read; the value can change an instant after you read the value. We simply can't separate the value check and the value modification.

The spec_count here is speculative. This is important. I'm essentially making a guess at what the count will be. It's a pretty good guess, but it's a guess. cmpxchg will fail if my guess is wrong, at which point the routine has to loop and try again. If I guess 0, then I will either be woken up (as it ceases to be zero while I'm on the waitqueue), or I will notice it's not zero anymore in the schedule test.

You should also note that there is no possible way to make a function that contains a blocking operation atomic. It's nonsensical. Atomic functions, by definition, appear to execute instantaneously, not interleaved with anything else whatsoever. But a blocking function, by definition, waits for something else to happen. This is inconsistent. Likewise, no atomic operation can be 'split up' across a blocking operation, which it is in your example.

Now, you could do away with a lot of this complexity by declaring the function non-preemptable. By using locks or other methods, you simply ensure only one thread is ever running (not including blocking of course) in the semaphore code at a time. But a problem still remains then. Start with a value of 0, where C has taken the semaphore down twice, then:

  • Thread A: if (S->Value < 0) // Value = 0
  • Thread A: Block....
  • Thread B: if (S->Value < 0) // Value = 0
  • Thread B: Block....
  • Thread C: S->Value++ // value = 1
  • Thread C: Wakeup(A)
  • (Thread C calls signal() again)
  • Thread C: S->Value++ // value = 2
  • Thread C: Wakeup(B)
  • (Thread C calls wait())
  • Thread C: if (S->Value <= 0) // Value = 2
  • Thread C: S->Value-- // Value = 1
  • // A and B have been woken
  • Thread A: S->Value-- // Value = 0
  • Thread B: S->Value-- // Value = -1

You could probably fix this with a loop to recheck S->value - again, assuming you are on a single processor machine and your semaphore code is preemptable. Unfortunately, these assumptions are false on all desktop OSes :)

For more discussion on how real locking protocols work, you might be interested in the paper "Fuss, Futexes and Furwocks: Fast Userlevel Locking in Linux"

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文