python 中值过期的容器

发布于 2024-10-02 21:01:59 字数 52 浏览 1 评论 0原文

我正在寻找一个线程安全的Python容器,其中的值在一段时间后会自动删除。存在这样的类吗?

I am after a thread safe Python container where the values are automatically removed after a time. Does such a class exist?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

窝囊感情。 2024-10-09 21:02:00

这是 ExpireCounter: 的线程安全版本:

import datetime
import collections
import threading

class ExpireCounter:
    """Tracks how many events were added in the preceding time period
    """

    def __init__(self, timeout=1):
        self.lock=threading.Lock()        
        self.timeout = timeout
        self.events = collections.deque()

    def add(self,item):
        """Add event time
        """
        with self.lock:
            self.events.append(item)
            threading.Timer(self.timeout,self.expire).start()

    def __len__(self):
        """Return number of active events
        """
        with self.lock:
            return len(self.events)

    def expire(self):
        """Remove any expired events
        """
        with self.lock:
            self.events.popleft()

    def __str__(self):
        with self.lock:
            return str(self.events)

可以像这样使用:

import time
c = ExpireCounter()
assert(len(c) == 0)
print(c)
# deque([])

c.add(datetime.datetime.now())
time.sleep(0.75)
c.add(datetime.datetime.now())    
assert(len(c) == 2)
print(c)
# deque([datetime.datetime(2010, 11, 19, 8, 50, 0, 91426), datetime.datetime(2010, 11, 19, 8, 50, 0, 842715)])

time.sleep(0.75)
assert(len(c) == 1)
print(c)
# deque([datetime.datetime(2010, 11, 19, 8, 50, 0, 842715)])

Here is a thread safe version of ExpireCounter:

import datetime
import collections
import threading

class ExpireCounter:
    """Tracks how many events were added in the preceding time period
    """

    def __init__(self, timeout=1):
        self.lock=threading.Lock()        
        self.timeout = timeout
        self.events = collections.deque()

    def add(self,item):
        """Add event time
        """
        with self.lock:
            self.events.append(item)
            threading.Timer(self.timeout,self.expire).start()

    def __len__(self):
        """Return number of active events
        """
        with self.lock:
            return len(self.events)

    def expire(self):
        """Remove any expired events
        """
        with self.lock:
            self.events.popleft()

    def __str__(self):
        with self.lock:
            return str(self.events)

which can be used like this:

import time
c = ExpireCounter()
assert(len(c) == 0)
print(c)
# deque([])

c.add(datetime.datetime.now())
time.sleep(0.75)
c.add(datetime.datetime.now())    
assert(len(c) == 2)
print(c)
# deque([datetime.datetime(2010, 11, 19, 8, 50, 0, 91426), datetime.datetime(2010, 11, 19, 8, 50, 0, 842715)])

time.sleep(0.75)
assert(len(c) == 1)
print(c)
# deque([datetime.datetime(2010, 11, 19, 8, 50, 0, 842715)])
昔梦 2024-10-09 21:02:00

也许您需要 LRU 缓存。这是我一直想尝试的一个:

http://pypi.python.org/pypi/ repoze.lru

看起来是线程安全的。

Perhaps you want an LRU cache. Here's one I've been meaning to try out:

http://pypi.python.org/pypi/repoze.lru

It seems to be thread safe.

沉溺在你眼里的海 2024-10-09 21:02:00

这或多或少是我现在想要的:

from datetime import datetime, timedelta
from collections import deque

class ExpireCounter:
    """Tracks how many events were added in the preceding time period
    """

    def __init__(self, timeout=timedelta(seconds=1)):
        self.timeout = timeout
        self.events = deque()

    def add(self):
        """Add event time
        """
        self.events.append(datetime.now())

    def __len__(self):
        """Return number of active events
        """
        self.expire()
        return len(self.events)

    def expire(self):
        """Remove any expired events
        """
        now = datetime.now()
        try:
            while self.events[0] + self.timeout < now:
                self.events.popleft()
        except IndexError:
            pass # no more events


if __name__ == '__main__':
    import time
    c = ExpireCounter()
    assert(len(c) == 0)
    c.inc()
    time.sleep(0.75)
    c.inc()
    assert(len(c) == 2)
    time.sleep(0.75)
    assert(len(c) == 1)

This is more or less what I want for now:

from datetime import datetime, timedelta
from collections import deque

class ExpireCounter:
    """Tracks how many events were added in the preceding time period
    """

    def __init__(self, timeout=timedelta(seconds=1)):
        self.timeout = timeout
        self.events = deque()

    def add(self):
        """Add event time
        """
        self.events.append(datetime.now())

    def __len__(self):
        """Return number of active events
        """
        self.expire()
        return len(self.events)

    def expire(self):
        """Remove any expired events
        """
        now = datetime.now()
        try:
            while self.events[0] + self.timeout < now:
                self.events.popleft()
        except IndexError:
            pass # no more events


if __name__ == '__main__':
    import time
    c = ExpireCounter()
    assert(len(c) == 0)
    c.inc()
    time.sleep(0.75)
    c.inc()
    assert(len(c) == 2)
    time.sleep(0.75)
    assert(len(c) == 1)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文