有助于文件锁定的实用程序 - 需要专家提示

发布于 2024-09-02 22:38:30 字数 4375 浏览 3 评论 0原文

我编写了一个文件的子类,a)提供了方便锁定它的方法(使用 fcntl,因此它只支持 unix,但这对我来说没问题 atm)和 b)在读取或写入时断言文件已被适当锁定。

现在我不是这些东西的专家(我刚刚读过一篇论文[ de] 关于它)并希望得到一些反馈:它是否安全,是否存在竞争条件,是否还有其他可以做得更好的事情......这是代码:(

from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN, LOCK_NB

class LockedFile(file):
    """
    A wrapper around `file` providing locking. Requires a shared lock to read
    and a exclusive lock to write.

    Main differences:
     * Additional methods: lock_ex, lock_sh, unlock
     * Refuse to read when not locked, refuse to write when not locked
       exclusivly.
     * mode cannot be `w` since then the file would be truncated before
       it could be locked.

    You have to lock the file yourself, it won't be done for you implicitly.
    Only you know what lock you need.

    Example usage::
        def get_config():
            f = LockedFile(CONFIG_FILENAME, 'r')
            f.lock_sh()
            config = parse_ini(f.read())
            f.close()

        def set_config(key, value):
            f = LockedFile(CONFIG_FILENAME, 'r+')
            f.lock_ex()
            config = parse_ini(f.read())
            config[key] = value
            f.truncate()
            f.write(make_ini(config))
            f.close()
    """

    def __init__(self, name, mode='r', *args, **kwargs):
        if 'w' in mode:
            raise ValueError('Cannot open file in `w` mode')

        super(LockedFile, self).__init__(name, mode, *args, **kwargs)

        self.locked = None

    def lock_sh(self, **kwargs):
        """
        Acquire a shared lock on the file. If the file is already locked
        exclusively, do nothing.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        :param nonblocking: Don't wait for the lock to be available.
        """
        if self.locked == 'ex':
            return # would implicitly remove the exclusive lock
        return self._lock(LOCK_SH, **kwargs)

    def lock_ex(self, **kwargs):
        """
        Acquire an exclusive lock on the file.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        :param nonblocking: Don't wait for the lock to be available.
        """
        return self._lock(LOCK_EX, **kwargs)

    def unlock(self):
        """
        Release all locks on the file.
        Flushes if there was an exclusive lock.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        """
        if self.locked == 'ex':
            self.flush()
        return self._lock(LOCK_UN)

    def _lock(self, mode, nonblocking=False):
        flock(self, mode | bool(nonblocking) * LOCK_NB)
        before = self.locked
        self.locked = {LOCK_SH: 'sh', LOCK_EX: 'ex', LOCK_UN: None}[mode]
        return before

    def _assert_read_lock(self):
        assert self.locked, "File is not locked"

    def _assert_write_lock(self):
        assert self.locked == 'ex', "File is not locked exclusively"


    def read(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).read(*args)

    def readline(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).readline(*args)

    def readlines(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).readlines(*args)

    def xreadlines(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).xreadlines(*args)

    def __iter__(self):
        self._assert_read_lock()
        return super(LockedFile, self).__iter__()

    def next(self):
        self._assert_read_lock()
        return super(LockedFile, self).next()


    def write(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).write(*args)

    def writelines(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).writelines(*args)

    def flush(self):
        self._assert_write_lock()
        return super(LockedFile, self).flush()

    def truncate(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).truncate(*args)

    def close(self):
        self.unlock()
        return super(LockedFile, self).close()

文档字符串中的示例也是我当前使用的感谢

您阅读到这里,甚至可能回答:)

I've written a subclass of file that a) provides methods to conveniently lock it (using fcntl, so it only supports unix, which is however OK for me atm) and b) when reading or writing asserts that the file is appropriately locked.

Now I'm not an expert at such stuff (I've just read one paper [de] about it) and would appreciate some feedback: Is it secure, are there race conditions, are there other things that could be done better … Here is the code:

from fcntl import flock, LOCK_EX, LOCK_SH, LOCK_UN, LOCK_NB

class LockedFile(file):
    """
    A wrapper around `file` providing locking. Requires a shared lock to read
    and a exclusive lock to write.

    Main differences:
     * Additional methods: lock_ex, lock_sh, unlock
     * Refuse to read when not locked, refuse to write when not locked
       exclusivly.
     * mode cannot be `w` since then the file would be truncated before
       it could be locked.

    You have to lock the file yourself, it won't be done for you implicitly.
    Only you know what lock you need.

    Example usage::
        def get_config():
            f = LockedFile(CONFIG_FILENAME, 'r')
            f.lock_sh()
            config = parse_ini(f.read())
            f.close()

        def set_config(key, value):
            f = LockedFile(CONFIG_FILENAME, 'r+')
            f.lock_ex()
            config = parse_ini(f.read())
            config[key] = value
            f.truncate()
            f.write(make_ini(config))
            f.close()
    """

    def __init__(self, name, mode='r', *args, **kwargs):
        if 'w' in mode:
            raise ValueError('Cannot open file in `w` mode')

        super(LockedFile, self).__init__(name, mode, *args, **kwargs)

        self.locked = None

    def lock_sh(self, **kwargs):
        """
        Acquire a shared lock on the file. If the file is already locked
        exclusively, do nothing.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        :param nonblocking: Don't wait for the lock to be available.
        """
        if self.locked == 'ex':
            return # would implicitly remove the exclusive lock
        return self._lock(LOCK_SH, **kwargs)

    def lock_ex(self, **kwargs):
        """
        Acquire an exclusive lock on the file.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        :param nonblocking: Don't wait for the lock to be available.
        """
        return self._lock(LOCK_EX, **kwargs)

    def unlock(self):
        """
        Release all locks on the file.
        Flushes if there was an exclusive lock.

        :returns: Lock status from before the call (one of 'sh', 'ex', None).
        """
        if self.locked == 'ex':
            self.flush()
        return self._lock(LOCK_UN)

    def _lock(self, mode, nonblocking=False):
        flock(self, mode | bool(nonblocking) * LOCK_NB)
        before = self.locked
        self.locked = {LOCK_SH: 'sh', LOCK_EX: 'ex', LOCK_UN: None}[mode]
        return before

    def _assert_read_lock(self):
        assert self.locked, "File is not locked"

    def _assert_write_lock(self):
        assert self.locked == 'ex', "File is not locked exclusively"


    def read(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).read(*args)

    def readline(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).readline(*args)

    def readlines(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).readlines(*args)

    def xreadlines(self, *args):
        self._assert_read_lock()
        return super(LockedFile, self).xreadlines(*args)

    def __iter__(self):
        self._assert_read_lock()
        return super(LockedFile, self).__iter__()

    def next(self):
        self._assert_read_lock()
        return super(LockedFile, self).next()


    def write(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).write(*args)

    def writelines(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).writelines(*args)

    def flush(self):
        self._assert_write_lock()
        return super(LockedFile, self).flush()

    def truncate(self, *args):
        self._assert_write_lock()
        return super(LockedFile, self).truncate(*args)

    def close(self):
        self.unlock()
        return super(LockedFile, self).close()

(the example in the docstring is also my current use case for this)

Thanks for having read until down here, and possibly even answering :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

夏了南城 2024-09-09 22:38:30

我也不是专家,但有一件事你应该改变,还有其他一些需要考虑:

首先,以这种方式使用 assert 是一个坏主意:如果 python 运行时使用 -O 或-OO 断言已关闭,您的两个 assert_*_lock() 方法将始终返回 True。

其次——你需要一些测试。 :) 我冒昧地添加了一个自定义错误类并编写了几个测试。前四次通过,最后一次失败;这就提出了一个问题,如果文件正常打开(如其他一些非 LockedFile 对象)并向其中写入数据,会发生什么情况?

哦,最后——LockableFile 这个名字对我来说更有意义,因为文件可以处于解锁状态。

以下是我所做的更改:

class LockedFileError(OSError): # might want IOError instead
    pass

if __name__ == '__main__':
    import unittest
    import tempfile
    import shutil
    import os

    class TestLockedFile(unittest.TestCase):
        def setUp(self):
            self.dir = tempfile.mkdtemp()
            self.testfile = testfile = os.path.join(self.dir, 'opened.txt')
            temp = open(testfile, 'w')
            temp.write('[global]\nsetting1=99\nsetting2=42\n')
            temp.close()

        def tearDown(self):
            shutil.rmtree(self.dir, ignore_errors=True)

        def test_01(self):
            "writes fail if not locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r+')
            self.assertRaises(LockedFileError, temp.write, 'arbitrary data')
            temp.lock_sh()
            self.assertRaises(LockedFileError, temp.write, 'arbitrary data')

        def test_02(self):
            "reads fail if not locked"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            self.assertRaises(LockedFileError, temp.read)

        def test_03(self):
            "writes succeed if locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r+')
            temp.lock_ex()
            temp.write('arbitrary data\n')

        def test_04(self):
            "reads succeed if locked"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            temp.lock_sh()
            temp.readline()
            temp.lock_ex()
            temp.readline()

        def test_05(self):
            "other writes fail if locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            temp.lock_ex()
            testing = open(testfile, 'r+')
            # not sure if this should be OSError, IOError, or something else...
            self.assertRaises(OSError, testing.write, 'this should fail\n')

    unittest.main()

应该编写更多的测试来涵盖 LockedFile 与读取、写入以及尝试读取/写入同一实际文件的其他非 LockedFile 文件对象的各种组合。

I am also not an expert, but there is one thing you should change, and a couple others to consider:

First off, using assert this way is a bad idea: if python is run with -O or -OO asserts are turned off and your two assert_*_lock() methods would always return True.

Second -- you need some tests. :) I took the liberty of added a custom error class and writing a couple tests. The first four pass, the last fails; which brings up the question, what should happen if the file is opened normally (as some other non-LockedFile object) and data is written to it?

Oh, and lastly -- the name LockableFile makes more sense to me, since the file can be in an unlocked state.

Here are the changes I made:

class LockedFileError(OSError): # might want IOError instead
    pass

if __name__ == '__main__':
    import unittest
    import tempfile
    import shutil
    import os

    class TestLockedFile(unittest.TestCase):
        def setUp(self):
            self.dir = tempfile.mkdtemp()
            self.testfile = testfile = os.path.join(self.dir, 'opened.txt')
            temp = open(testfile, 'w')
            temp.write('[global]\nsetting1=99\nsetting2=42\n')
            temp.close()

        def tearDown(self):
            shutil.rmtree(self.dir, ignore_errors=True)

        def test_01(self):
            "writes fail if not locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r+')
            self.assertRaises(LockedFileError, temp.write, 'arbitrary data')
            temp.lock_sh()
            self.assertRaises(LockedFileError, temp.write, 'arbitrary data')

        def test_02(self):
            "reads fail if not locked"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            self.assertRaises(LockedFileError, temp.read)

        def test_03(self):
            "writes succeed if locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r+')
            temp.lock_ex()
            temp.write('arbitrary data\n')

        def test_04(self):
            "reads succeed if locked"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            temp.lock_sh()
            temp.readline()
            temp.lock_ex()
            temp.readline()

        def test_05(self):
            "other writes fail if locked exclusively"
            testfile = self.testfile
            temp = LockedFile(testfile, 'r')
            temp.lock_ex()
            testing = open(testfile, 'r+')
            # not sure if this should be OSError, IOError, or something else...
            self.assertRaises(OSError, testing.write, 'this should fail\n')

    unittest.main()

Many more tests should be written to cover the various combinations of a LockedFile with reads, writes, and other non-LockedFile file objects trying to read/write to the same actual file.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文