动态分配和销毁互斥锁​​?

发布于 2024-10-25 02:54:54 字数 1447 浏览 2 评论 0原文

我有一个基于 Eventlet 构建的应用程序。

我正在尝试编写一个像样的装饰器来同步跨线程对某些方法的调用。

装饰器目前看起来像这样:

_semaphores_semaphore = semaphore.Semaphore()
_semaphores = {}

def synchronized(name):
    def wrap(f):
        def inner(*args, **kwargs):
            # Grab the lock protecting _semaphores.
            with _semaphores_semaphore:
                # If the named semaphore does not yet exist, create it.
                if name not in _semaphores:
                    _semaphores[name] = semaphore.Semaphore()
                sem = _semaphores[name]

            with sem:
                return f(*args, **kwargs)

这工作正常,对我来说看起来不错并且线程安全,尽管整个线程安全和锁定业务对我来说可能有点生疏。

问题在于,应用程序中其他地方对信号量的特定现有使用(我想使用此装饰器将其转换为该信号量)会动态创建这些信号量:基于用户输入,它必须创建一个文件。它在字典中检查是否已有该文件的信号量,如果没有,则创建一个信号量并锁定它。一旦完成并释放了锁,它就会检查它是否再次被锁定(同时被另一个进程锁定),如果没有,它就会删除信号量。该代码是在绿色线程的假设下编写的,并且在该上下文中是安全的,但如果我想将其转换为使用我的装饰器,这是我无法解决的。

如果我不关心清理可能永远不会再次使用的信号量(可能有数十万个),我很好。如果我确实想清理它们,我不知道该怎么做。

要删除信号量,显然我需要持有 _semaphores_semaphore,因为我正在操纵 _semaphores 字典,但我也必须对特定信号量做一些事情,但我能想到的一切似乎都很有趣: * 在“with sem:”块内,我可以从 _semaphores 获取 _semaphores_semaphore 和 sem。然而,其他线程可能会被阻塞等待(在“with sem:”),如果一个新线程出现想要接触相同的资源,它不会在 _semaphores 中找到相同的信号量,而是创建一个新的信号量 = >失败。 我可以通过检查 sem 的平衡来稍微改进这一点,看看是否有另一个线程已经在等待我释放它。如果有,请保留它,如果没有,请删除它。这样,最后一个等待对资源执行操作的线程将删除它。但是,如果线程刚刚离开“with _semaphores_semaphore:”块,但尚未到达“with sem:”,我会遇到与之前相同的问题=>失败。

感觉就像我错过了一些明显的东西,但我无法弄清楚它是什么。

I have an application that's built on top of Eventlet.

I'm trying to write a decent decorator for synchronizing calls to certain methods across threads.

The decorator currently looks something like this:

_semaphores_semaphore = semaphore.Semaphore()
_semaphores = {}

def synchronized(name):
    def wrap(f):
        def inner(*args, **kwargs):
            # Grab the lock protecting _semaphores.
            with _semaphores_semaphore:
                # If the named semaphore does not yet exist, create it.
                if name not in _semaphores:
                    _semaphores[name] = semaphore.Semaphore()
                sem = _semaphores[name]

            with sem:
                return f(*args, **kwargs)

This works fine, and looks nice and thread safe to me, although this whole thread safety and locking business might be a bit rusty for me.

The problem is that a specific, existing use of semaphores elsewhere in the application, which I'm wanting to convert to using this decorator, creates these semaphores on the fly: Based on user input, it has to create a file. It checks in a dict whether it already has a semaphore for this file, if not, it creates one, and locks it. Once it's done and has released the lock, it checks if it's been locked again (by another process in the mean time), and if not, it deletes the semaphore. This code is written with the assumption of green threads and is safe in that context, but if I want to convert it to use my decorator, and this is what I can't work out.

If I don't care about cleaning up the possibly-never-to-be-used-again semaphores (there could be hundreds of thousands of these), I'm fine. If I do want to clean them up, I'm not sure what to do.

To delete the semaphore, it seems obvious that I need to be holding the _semaphores_semaphore, since I'm manipulating the _semaphores dict, but I have to do something with the specific semaphore, too, but everything I can think of seems to be racy:
* While inside the "with sem:" block, I could grab the _semaphores_semaphore and sem from _semaphores. However, other threads might be blocked waiting for it (at "with sem:"), and if a new thread comes along wanting to touch the same resource, it will not find the same semaphore in _semaphores, but instead create a new one => fail.
I could improve this slightly by checking the balance of sem to see if another thread is already waiting for me to release it. If so, leave it alone, if not, delete it. This way, the last thread waiting to act on the resource will delete it. However, if a thread has just left the "with _semaphores_semaphore:" block, but hasn't yet made it to "with sem:", I have the same problem as before => fail.

It feels like I'm missing something obvious, but I can't work out what it is.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

携君以终年 2024-11-01 02:54:54

我想你也许可以用读写锁来解决这个问题。 _semaphores 字典上的共享独占锁。
这是未经测试的代码,用于展示原理。 RWLock 实现可以在例如 http: //code.activestate.com/recipes/413393-multiple-reader-one-writer-mrow-resource-locking/

_semaphores_rwlock = RWLock()
_semaphores = {}

def synchronized(name):
    def wrap(f):
        def inner(*args, **kwargs):
            lock = _semaphores_rwlock.reader()
            # If the named semaphore does not yet exist, create it.
            if name not in _semaphores:
                lock = _semaphores_rwlock.writer()
                _semaphores[name] = semaphore.Semaphore()

            sem = _semaphores[name]

            with sem:
                retval = f(*args, **kwargs)
            lock.release()
            return retval

当你想清理时,你可以这样做:

wlock = _semaphores_rwlock.writer() #this might take a while; it waits for all readers to release
cleanup(_semaphores)
wlock.release()

I think you might be able to solve it with a reader-writer lock aka. shared-exclusive lock on the _semaphores dict.
This is untested code, to show the principle. An RWLock implementation can be found in e.g. http://code.activestate.com/recipes/413393-multiple-reader-one-writer-mrow-resource-locking/

_semaphores_rwlock = RWLock()
_semaphores = {}

def synchronized(name):
    def wrap(f):
        def inner(*args, **kwargs):
            lock = _semaphores_rwlock.reader()
            # If the named semaphore does not yet exist, create it.
            if name not in _semaphores:
                lock = _semaphores_rwlock.writer()
                _semaphores[name] = semaphore.Semaphore()

            sem = _semaphores[name]

            with sem:
                retval = f(*args, **kwargs)
            lock.release()
            return retval

When you want to clean up you do:

wlock = _semaphores_rwlock.writer() #this might take a while; it waits for all readers to release
cleanup(_semaphores)
wlock.release()
も星光 2024-11-01 02:54:54

mchro 的答案对我不起作用,因为每当一个线程需要创建新信号量时,它就会阻止单个信号量上的所有线程。

我想出的答案是使用 _semaphores 保留两个事务之间的占用者计数器(这两个事务都是在同一个互斥体后面完成的):

A: get semaphore
A1: dangerzone
B: with sem: block etc
C: cleanup semaphore

问题是知道 A 之间有多少人C。信号量的计数器不会告诉您这一点,因为有人可能在 A1 中。答案是在 _semaphores 中为每个信号量保留一个进入者计数器,在 A 处递增它,在 C 处递减它,如果是如果为 0,则您知道 A-C 中没有其他人具有相同的密钥,您可以安全地删除它。

mchro's answer didn't work for me since it blocks all threads on a single semaphore whenever one thread needs to create a new semaphore.

The answer that I came up with is to keep counters of occupants between the two transactions with _semaphores (which are both done behind the same mutex):

A: get semaphore
A1: dangerzone
B: with sem: block etc
C: cleanup semaphore

The problem is knowing how many people are between A and C. The counter of the semaphore doesn't tell you that, since someone may be in A1. The answer is to keep a counter of entrants along with each semaphore in _semaphores, increment it at A, decrement it at C, and if it's at 0 then you know that there's no-one else in A-C with the same key and you can safely delete it.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文