Python 中的持久记忆

发布于 2025-01-06 14:13:49 字数 4585 浏览 0 评论 0原文

我有一个昂贵的函数,它接受并返回少量数据(一些整数和浮点数)。我已经memoized这个函数,但我想让备忘录持久化。已经有几个与此相关的线程,但我不确定某些建议方法的潜在问题,并且我有一些相当具体的要求:

  • 我肯定会同时使用多个线程和进程中的函数(都使用 < code>multiprocessing 和来自单独的 python 脚本)
  • 我不需要从这个 python 函数外部读取或写入备忘录
  • 我并不担心备忘录在极少数情况下被损坏(比如拔掉插头或意外写入到文件而不锁定它),因为它不是 重建起来很昂贵(通常需要 10-20 分钟),但我希望它不会因为异常而损坏,或者手动终止 python 进程(我不知道这有多现实
  • )强烈喜欢不需要大型外部库的解决方案,因为我在一台机器上的硬盘空间非常有限,我将在其上运行代码
  • 我对跨平台代码有较弱的偏好,但我可能只会使用Linux 上的这个

此线程讨论了 shelve 模块,该模块显然不是进程安全的。其中两个答案建议使用 fcntl.flock 来锁定搁置文件。然而,此线程中的一些回复似乎表明这充满了问题 - 但我不太确定它们是什么。听起来好像这仅限于 Unix(尽管显然 Windows 有一个等效的名为 msvcrt.locking),并且锁定只是“建议” - 即,它不会阻止我意外写入文件未经检查就被锁定。还有其他潜在的问题吗?写入文件的副本并作为最后一步替换主副本是否可以降低损坏的风险?

看起来 dbm 模块 不会比搁置更好。我快速浏览了 sqlite3 ,但这似乎有点矫枉过正目的。 此线程这个提到了几个第三方库,包括ZODB,但是有很多选择,而且对于这项任务来说,它们都显得过于庞大和复杂。

有人有什么建议吗?

更新:下面提到了 IncPy,它看起来确实很有趣。不幸的是,我不想回到Python 2.6(我实际上使用的是3.2),而且看起来与C库一起使用有点尴尬(我大量使用numpy和scipy等)。

kindall 的另一个想法很有启发性,但我认为将其适应多个进程会有点困难 - 我认为用文件锁定或数据库替换队列是最简单的。

再次查看 ZODB,它看起来确实非常适合该任务,但我确实想避免使用任何其他库。我仍然不完全确定仅使用flock的所有问题是什么 - 我想一个大问题是如果进程在写入文件时或释放锁之前终止?

所以,我采纳了synthesizerpatel的建议并选择了sqlite3。如果有人感兴趣,我决定对 dict 进行直接替换,将其条目作为泡菜存储在数据库中(我不费心将任何内容保留在内存中,因为数据库访问和泡菜速度足够快与我正在做的其他事情相比)。我确信有更有效的方法可以做到这一点(并且我不知道我是否仍然存在并发问题),但这里是代码:

from collections import MutableMapping
import sqlite3
import pickle


class PersistentDict(MutableMapping):
    def __init__(self, dbpath, iterable=None, **kwargs):
        self.dbpath = dbpath
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'create table if not exists memo '
                '(key blob primary key not null, value blob not null)'
            )
        if iterable is not None:
            self.update(iterable)
        self.update(kwargs)

    def encode(self, obj):
        return pickle.dumps(obj)

    def decode(self, blob):
        return pickle.loads(blob)

    def get_connection(self):
        return sqlite3.connect(self.dbpath)

    def  __getitem__(self, key):
        key = self.encode(key)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select value from memo where key=?',
                (key,)
            )
            value = cursor.fetchone()
        if value is None:
            raise KeyError(key)
        return self.decode(value[0])

    def __setitem__(self, key, value):
        key = self.encode(key)
        value = self.encode(value)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'insert or replace into memo values (?, ?)',
                (key, value)
            )

    def __delitem__(self, key):
        key = self.encode(key)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select count(*) from memo where key=?',
                (key,)
            )
            if cursor.fetchone()[0] == 0:
                raise KeyError(key)
            cursor.execute(
                'delete from memo where key=?',
                (key,)
            )

    def __iter__(self):
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select key from memo'
            )
            records = cursor.fetchall()
        for r in records:
            yield self.decode(r[0])

    def __len__(self):
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select count(*) from memo'
            )
            return cursor.fetchone()[0]

I have an expensive function that takes and returns a small amount of data (a few integers and floats). I have already memoized this function, but I would like to make the memo persistent. There are already a couple of threads relating to this, but I'm unsure about potential issues with some of the suggested approaches, and I have some fairly specific requirements:

  • I will definitely use the function from multiple threads and processes simultaneously (both using multiprocessing and from separate python scripts)
  • I will not need read or write access to the memo from outside this python function
  • I am not that concerned about the memo being corrupted on rare occasions (like pulling the plug or accidentally writing to the file without locking it) as it isn't that expensive to rebuild (typically 10-20 minutes) but I would prefer if it would not be corrupted because of exceptions, or manually terminating a python process (I don't know how realistic that is)
  • I would strongly prefer solutions that don't require large external libraries as I have a severely limited amount of hard disk space on one machine I will be running the code on
  • I have a weak preference for cross-platform code, but I will likely only use this on Linux

This thread discusses the shelve module, which is apparently not process-safe. Two of the answers suggest using fcntl.flock to lock the shelve file. Some of the responses in this thread, however, seem to suggest that this is fraught with problems - but I'm not exactly sure what they are. It sounds as though this is limited to Unix (though apparently Windows has an equivalent called msvcrt.locking), and the lock is only 'advisory' - i.e., it won't stop me from accidentally writing to the file without checking it is locked. Are there any other potential problems? Would writing to a copy of the file, and replacing the master copy as a final step, reduce the risk of corruption?

It doesn't look as though the dbm module will do any better than shelve. I've had a quick look at sqlite3, but it seems a bit overkill for this purpose. This thread and this one mention several 3rd party libraries, including ZODB, but there are a lot of choices, and they all seem overly large and complicated for this task.

Does anyone have any advice?

UPDATE: kindall mentioned IncPy below, which does look very interesting. Unfortunately, I wouldn't want to move back to Python 2.6 (I'm actually using 3.2), and it looks like it is a bit awkward to use with C libraries (I make heavy use of numpy and scipy, among others).

kindall's other idea is instructive, but I think adapting this to multiple processes would be a little difficult - I suppose it would be easiest to replace the queue with file locking or a database.

Looking at ZODB again, it does look perfect for the task, but I really do want to avoid using any additional libraries. I'm still not entirely sure what all the issues with simply using flock are - I imagine one big problem is if a process is terminated while writing to the file, or before releasing the lock?

So, I've taken synthesizerpatel's advice and gone with sqlite3. If anyone's interested, I decided to make a drop-in replacement for dict that stores its entries as pickles in a database (I don't bother to keep any in memory as database access and pickling is fast enough compared to everything else I'm doing). I'm sure there are more efficient ways of doing this (and I've no idea whether I might still have concurrency issues), but here is the code:

from collections import MutableMapping
import sqlite3
import pickle


class PersistentDict(MutableMapping):
    def __init__(self, dbpath, iterable=None, **kwargs):
        self.dbpath = dbpath
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'create table if not exists memo '
                '(key blob primary key not null, value blob not null)'
            )
        if iterable is not None:
            self.update(iterable)
        self.update(kwargs)

    def encode(self, obj):
        return pickle.dumps(obj)

    def decode(self, blob):
        return pickle.loads(blob)

    def get_connection(self):
        return sqlite3.connect(self.dbpath)

    def  __getitem__(self, key):
        key = self.encode(key)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select value from memo where key=?',
                (key,)
            )
            value = cursor.fetchone()
        if value is None:
            raise KeyError(key)
        return self.decode(value[0])

    def __setitem__(self, key, value):
        key = self.encode(key)
        value = self.encode(value)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'insert or replace into memo values (?, ?)',
                (key, value)
            )

    def __delitem__(self, key):
        key = self.encode(key)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select count(*) from memo where key=?',
                (key,)
            )
            if cursor.fetchone()[0] == 0:
                raise KeyError(key)
            cursor.execute(
                'delete from memo where key=?',
                (key,)
            )

    def __iter__(self):
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select key from memo'
            )
            records = cursor.fetchall()
        for r in records:
            yield self.decode(r[0])

    def __len__(self):
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select count(*) from memo'
            )
            return cursor.fetchone()[0]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

锦欢 2025-01-13 14:13:49

sqlite3 开箱即用地提供 ACID。文件锁定很容易出现竞争条件和并发问题,而使用 sqlite3 则不会出现这些问题。

基本上,是的,sqlite3 超出了您的需要,但它并不是一个巨大的负担。它可以在手机上运行,​​所以这并不像你承诺运行一些糟糕的软件。它将节省您重新发明轮子和调试锁定问题的时间。

sqlite3 out of the box provides ACID. File locking is prone to race-conditions and concurrency problems that you won't have using sqlite3.

Basically, yeah, sqlite3 is more than what you need, but it's not a huge burden. It can run on mobile phones, so it's not like you're committing to running some beastly software. It's going to save you time reinventing wheels and debugging locking issues.

琉璃繁缕 2025-01-13 14:13:49

我假设您想继续将函数的结果存储在 RAM 中(可能存储在字典中),但使用持久性来减少应用程序的“预热”时间。在这种情况下,您不会直接在后备存储中随机访问项目,因此数据库可能确实有点过分了(尽管正如synthesizerpatel所指出的那样,可能不如您认为)。

不过,如果您想推出自己的字典,一个可行的策略可能是在运行开始时在启动任何线程之前从文件中简单地加载字典。当结果不在字典中时,您需要在将其添加到字典后将其写入文件。您可以通过将其添加到队列并使用单个工作线程将项目从队列刷新到磁盘来完成此操作(只需将它们附加到单个文件就可以了)。您偶尔可能会多次添加相同的结果,但这并不是致命的,因为每次都会得到相同的结果,因此读回两次或更多次不会造成真正的伤害。 Python 的线程模型将使您摆脱大多数并发问题(例如,追加到列表是原子的)。

这是一些(未经测试的、通用的、不完整的)代码,显示了我正在谈论的内容:

import cPickle as pickle

import time, os.path

cache = {}
queue = []

# run at script start to warm up cache
def preload_cache(filename):
    if os.path.isfile(filename):
        with open(filename, "rb") as f:
            while True:
                try:
                    key, value = pickle.load(f), pickle.load(f)
                except EOFError:
                    break
                cache[key] = value

# your memoized function
def time_consuming_function(a, b, c, d):
    key = (a, b, c, d)
    if key in cache:
        return cache[key]
    else:
        # generate the result here
        # ...
        # add to cache, checking to see if it's already there again to avoid writing
        # it twice (in case another thread also added it) (this is not fatal, though)
        if key not in cache:
            cache[key] = result
            queue.append((key, result))
        return result

# run on worker thread to write new items out
def write_cache(filename):
    with open(filename, "ab") as f:
        while True:
            while queue:
                key, value = queue.pop()  # item order not important
                # but must write key and value in single call to ensure
                # both get written (otherwise, interrupting script might
                # leave only one written, corrupting the file)
                f.write(pickle.dumps(key, pickle.HIGHEST_PROTOCOL) +
                        pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
            f.flush()
            time.sleep(1)

如果我有时间,我会把它变成一个装饰器......并将持久性放入 dict 子类中...全局变量的使用也不是最佳的。 :-) 如果您将此方法与 multiprocessing 结合使用,您可能会希望使用 multiprocessing.Queue 而不是列表;然后,您可以使用queue.get()作为阻塞等待,以等待写入文件的工作进程中的新结果。不过,我没有使用过多处理,所以请对这个建议持保留态度。

I assume you want to continue to memoize the results of the function in RAM, probably in a dictionary, but use the persistence to reduce the "warmup" time of the application. In this case you're not going to be randomly accessing items directly in the backing store so a database might indeed be overkill (though as synthesizerpatel notes, maybe not as much as you think).

Still, if you want to roll your own, a viable strategy might be to simply load the dictionary from a file at the beginning of your run before starting any threads. When a result isn't in the dictionary, then you need to write it to the file after adding it to the dictionary. You can do this by adding it to a queue and using a single worker thread that flushes items from the queue to disk (just appending them to a single file would be fine). You might occasionally add the same result more than once, but this is not fatal since it'll be the same result each time, so reading it back in twice or more will do no real harm. Python's threading model will keep you out of most kinds of concurrency trouble (e.g., appending to a list is atomic).

Here is some (untested, generic, incomplete) code showing what I'm talking about:

import cPickle as pickle

import time, os.path

cache = {}
queue = []

# run at script start to warm up cache
def preload_cache(filename):
    if os.path.isfile(filename):
        with open(filename, "rb") as f:
            while True:
                try:
                    key, value = pickle.load(f), pickle.load(f)
                except EOFError:
                    break
                cache[key] = value

# your memoized function
def time_consuming_function(a, b, c, d):
    key = (a, b, c, d)
    if key in cache:
        return cache[key]
    else:
        # generate the result here
        # ...
        # add to cache, checking to see if it's already there again to avoid writing
        # it twice (in case another thread also added it) (this is not fatal, though)
        if key not in cache:
            cache[key] = result
            queue.append((key, result))
        return result

# run on worker thread to write new items out
def write_cache(filename):
    with open(filename, "ab") as f:
        while True:
            while queue:
                key, value = queue.pop()  # item order not important
                # but must write key and value in single call to ensure
                # both get written (otherwise, interrupting script might
                # leave only one written, corrupting the file)
                f.write(pickle.dumps(key, pickle.HIGHEST_PROTOCOL) +
                        pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
            f.flush()
            time.sleep(1)

If I had time, I'd turn this into a decorator... and put the persistence into a dict subclass... the use of global variables is also sub-optimal. :-) If you use this approach with multiprocessing you'd probably want to use a multiprocessing.Queue rather than a list; you can then use queue.get() as a blocking wait for a new result in the worker process that writes to the file. I've not used multiprocessing, though, so take this bit of advice with a grain of salt.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文