信号量的多线程问题

发布于 2025-01-18 23:45:34 字数 1186 浏览 1 评论 0原文

我需要有一段代码,该代码只允许基于参数键同时由 1 个线程执行:

    private static readonly ConcurrentDictionary<string, SemaphoreSlim> Semaphores = new();

    private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
    {
        var semaphore = Semaphores.GetOrAdd(valueKey, s => new SemaphoreSlim(1, 1));

        try
        {
            await semaphore.WaitAsync();

            return await valueAction();
        }
        finally
        {
            semaphore.Release(); // Exception here - System.ObjectDisposedException
            if (semaphore.CurrentCount > 0 && Semaphores.TryRemove(valueKey, out semaphore))
            {
                semaphore?.Dispose();
            }
        }
    }

有时我会收到错误:

The semaphore has been disposed. : System.ObjectDisposedException: The semaphore has been disposed.
   at System.Threading.SemaphoreSlim.CheckDispose()
   at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
   at Project.GetValueWithBlockAsync[TModel](String valueKey, Func`1 valueAction)

我在这里可以想象的所有情况都是线程安全的。请帮忙,我错过了什么案例?

I need to have the piece of code which allowed to execute only by 1 thread at the same time based on parameter key:

    private static readonly ConcurrentDictionary<string, SemaphoreSlim> Semaphores = new();

    private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
    {
        var semaphore = Semaphores.GetOrAdd(valueKey, s => new SemaphoreSlim(1, 1));

        try
        {
            await semaphore.WaitAsync();

            return await valueAction();
        }
        finally
        {
            semaphore.Release(); // Exception here - System.ObjectDisposedException
            if (semaphore.CurrentCount > 0 && Semaphores.TryRemove(valueKey, out semaphore))
            {
                semaphore?.Dispose();
            }
        }
    }

Time to time I got the error:

The semaphore has been disposed. : System.ObjectDisposedException: The semaphore has been disposed.
   at System.Threading.SemaphoreSlim.CheckDispose()
   at System.Threading.SemaphoreSlim.Release(Int32 releaseCount)
   at Project.GetValueWithBlockAsync[TModel](String valueKey, Func`1 valueAction)

All cases that I can imagine here are thread safety. Please help, what case I missed?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

与酒说心事 2025-01-25 23:45:34

您在这里有一个线程竞赛,另一个任务正在尝试获取相同的信号量,并在您版本 - ie另一个线程等待semaphore.waitasync()时获取它。 。针对CurrentCount的检查是一种竞赛条件,并且可以根据时间来进行任何任务。检查TryRemove的检查是无关紧要的,因为竞争性线程已经 删除了信号量 - 毕竟,等待waitaSync()

You have a thread race here, where another task is trying to acquire the same semaphore, and acquires it when you Release - i.e. another thread is awaiting the semaphore.WaitAsync(). The check against CurrentCount is a race condition, and it could go either way depending on timing. The check for TryRemove is irrelevant, as the competing thread already got the semaphore out - it was, after all, awaiting the WaitAsync().

一刻暧昧 2025-01-25 23:45:34

如评论中所述,您在这里有几个种族条件。

  1. 线程1保持锁定,线程2正在等待waitasync()。线程1释放锁,然后在Thread 2之前检查SmemAphore.CurrentCount
  2. 线程1握住锁,将其释放,然后检查Semaphore.currentCount通过。线程2输入getValueWithBlockAsync,呼叫semaphores.getoradd并获取了信号量。线程1然后调用信号量。tryremove并递给信号量。

您确实需要锁定从Semaphores中删除条目的决定 - 这是没有办法的。您也没有一种方法来跟踪是否有任何线程从semaphores中获取了信号量(并且当前正在等待它,或者还没有达到这一点)。

一种方法是做这样的事情:拥有一个在每个人之间共享的锁,但是只有在获取/创建信号量并决定是否处置时才需要。我们手动跟踪目前有多少线程对特定信号量有兴趣。当线程发布信号量时,它将获取共享锁,以检查其他人当前是否对该信号量有兴趣,并只有在没有其他人的情况下才能将其处置。

private static readonly object semaphoresLock = new();
private static readonly Dictionary<string, State> semaphores = new();

private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
    State state;
    lock (semaphoresLock)
    {
        if (!semaphores.TryGetValue(valueKey, out state))
        {
            state = new();
            semaphores[valueKey] = state;
        }
        
        state.Count++;
    }

    try
    {
        await state.Semaphore.WaitAsync();

        return await valueAction();
    }
    finally
    {
        state.Semaphore.Release();
        lock (semaphoresLock)
        {
            state.Count--;
            if (state.Count == 0)
            {
                semaphores.Remove(valueKey);
                state.Semaphore.Dispose();
            }
        }
    }
}

private class State
{
    public int Count { get; set; }
    public SemaphoreSlim Semaphore { get; } = new(1, 1);
}

当然,另一个选择是让信号量增长。也许您有定期操作可以进行并清除任何未使用的东西,但是这当然需要受到保护,以确保线程不会突然对被清除的信号量感兴趣。

As discussed in the comments, you have a couple of race conditions here.

  1. Thread 1 holds the lock and Thread 2 is waiting on WaitAsync(). Thread 1 releases the lock, and then checks semaphore.CurrentCount, before Thread 2 is able to acquire it.
  2. Thread 1 holds the lock, releases it, and checks semaphore.CurrentCount which passes. Thread 2 enters GetValueWithBlockAsync, calls Semaphores.GetOrAdd and fetches the semaphore. Thread 1 then calls Semaphores.TryRemove and diposes the semaphore.

You really need locking around the decision to remove an entry from Semaphores -- there's no way around this. You also don't have a way of tracking whether any threads have fetched a semaphore from Semaphores (and are either currently waiting on it, or haven't yet got to that point).

One way is to do something like this: have a lock which is shared between everyone, but which is only needed when fetching/creating a semaphore, and deciding whether to dispose it. We manually keep track of how many threads currently have an interest in a particular semaphore. When a thread has released the semaphore, it then acquires the shared lock to check whether anyone else currently has an interest in that semaphore, and disposes it only if noone else does.

private static readonly object semaphoresLock = new();
private static readonly Dictionary<string, State> semaphores = new();

private async Task<TModel> GetValueWithBlockAsync<TModel>(string valueKey, Func<Task<TModel>> valueAction)
{
    State state;
    lock (semaphoresLock)
    {
        if (!semaphores.TryGetValue(valueKey, out state))
        {
            state = new();
            semaphores[valueKey] = state;
        }
        
        state.Count++;
    }

    try
    {
        await state.Semaphore.WaitAsync();

        return await valueAction();
    }
    finally
    {
        state.Semaphore.Release();
        lock (semaphoresLock)
        {
            state.Count--;
            if (state.Count == 0)
            {
                semaphores.Remove(valueKey);
                state.Semaphore.Dispose();
            }
        }
    }
}

private class State
{
    public int Count { get; set; }
    public SemaphoreSlim Semaphore { get; } = new(1, 1);
}

The other option, of course, is to let Semaphores grow. Maybe you have a periodic operation to go through and clear out anything which isn't being used, but this will of course need to be protected to ensure that a thread doesn't suddenly become interested in a semaphore which is being cleared up.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文