这是生产者消费者类型的疯狂实现吗?

发布于 2024-12-04 04:39:17 字数 1672 浏览 1 评论 0原文

# file1.py

class _Producer(self):

  def __init__(self):
    self.chunksize = 6220800
    with open('/dev/zero') as f:
      self.thing = f.read(self.chunksize)
    self.n = 0
    self.start()

  def start(self):
    import subprocess
    import threading

    def produce():
      self._proc = subprocess.Popen(['producer_proc'], stdout=subprocess.PIPE)
      while True:
        self.thing = self._proc.stdout.read(self.chunksize)
        if len(self.thing) != self.chunksize:
          msg = 'Expected {0} bytes.  Read {1} bytes'.format(self.chunksize, len(self.thing))
          raise Exception(msg)
        self.n += 1

    t = threading.Thread(target=produce)
    t.daemon = True
    t.start()
    self._thread = t

  def stop(self):
    if self._thread.is_alive():
      self._proc.terminate()
      self._thread.join(1)

producer = _Producer()
producer.start()

我编写了一些或多或少类似于上述设计的代码,现在我希望能够通过以下方式使用其他文件中的 Producer_proc 的输出:

# some_other_file.py
import file1
my_thing = file1.producer.thing 

多个其他消费者可能会获取对 的引用>file. Producer.thing ,它们都需要从同一个Producer_proc 使用。并且Producer_proc 永远不应该被阻塞。这是一个合理的实施吗? python GIL 是否使其线程安全,或者我是否需要使用队列重新实现来获取工作线程的数据?消费者是否需要明确复制该物品?

我想我正在尝试实现诸如生产者/消费者模式或观察者模式之类的东西,但我不太清楚设计模式的所有技术细节。

  • 单个生产者不断地制造东西
  • 多个消费者在任意时间使用东西
  • Producer.thing 一旦新的东西可用,就应该用新的东西替换,大多数东西都会被闲置,但这没关系,
  • 没关系让多个消费者阅读同一个内容,或者连续阅读同一个内容两次。他们只想确保在要求时得到的是最新的东西,而不是一些过时的旧东西。
  • 消费者应该能够继续使用一个东西,只要他们在范围内,即使生产者可能已经用一个新的东西覆盖了他的 self.thing
# file1.py

class _Producer(self):

  def __init__(self):
    self.chunksize = 6220800
    with open('/dev/zero') as f:
      self.thing = f.read(self.chunksize)
    self.n = 0
    self.start()

  def start(self):
    import subprocess
    import threading

    def produce():
      self._proc = subprocess.Popen(['producer_proc'], stdout=subprocess.PIPE)
      while True:
        self.thing = self._proc.stdout.read(self.chunksize)
        if len(self.thing) != self.chunksize:
          msg = 'Expected {0} bytes.  Read {1} bytes'.format(self.chunksize, len(self.thing))
          raise Exception(msg)
        self.n += 1

    t = threading.Thread(target=produce)
    t.daemon = True
    t.start()
    self._thread = t

  def stop(self):
    if self._thread.is_alive():
      self._proc.terminate()
      self._thread.join(1)

producer = _Producer()
producer.start()

I have written some code more or less like the above design, and now I want to be able to consume the output of producer_proc in other files by going:

# some_other_file.py
import file1
my_thing = file1.producer.thing 

Multiple other consumers might be grabbing a reference to file.producer.thing, they all need to use from the same producer_proc. And the producer_proc should never be blocked. Is this a sane implementation? Does the python GIL make it thread safe, or do I need to reimplement using a Queue for getting data of the worker thread? Do consumers need to explicitly make a copy of the thing?

I guess am trying to implement something like Producer/Consumer pattern or Observer pattern, but I'm not really clear on all the technical details of design patterns.

  • A single producer is constantly making things
  • Multiple consumers using things at arbitrary times
  • producer.thing should be replaced by a fresh thing as soon as the new one is available, most things will go unused but that's ok
  • It's OK for multiple consumers to read the same thing, or to read the same thing twice in succession. They only want to be sure they have got the most recent thing when asked for it, not some stale old thing.
  • A consumer should be able to keep using a thing as long as they have it in scope, even though the producer may have already overwritten his self.thing with a fresh new thing.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

栖迟 2024-12-11 04:39:17

鉴于您的(不寻常!)要求,您的实现似乎是正确的。特别是,

  • 如果您只更新一个属性,Python GIL 应该就足够了。单字节码指令是原子的。
  • 如果您要做更复杂的事情,请添加锁定!无论如何,它基本上是无害的 - 如果您关心性能或多核可扩展性,您可能不会使用 Python!
  • 特别要注意的是,此代码中的 self.thing 和 self.n 是在单独的字节码指令中更新的。 GIL 可以在两者之间释放/获取,因此除非添加锁定,否则无法获得两者的一致视图。如果您不打算这样做,我建议删除 self.n 因为它是一个“有吸引力的麻烦”(很容易被误用),或者至少添加带有此警告的注释/文档字符串。
  • 消费者不需要复制。你永远不会改变 self.thing 指向的特定对象(并且不能使用字符串对象;它们是不可变的)并且 Python 是垃圾收集的,所以只要消费者抓住对它的引用,它可以继续访问它,而不必过多担心其他线程正在做什么。最糟糕的情况可能是您的程序使用了几代 self.thing 保持活动状态的大量内存。

我有点好奇你的要求从何而来。特别是,您不关心事物是否从未使用过或使用过多次。

Given your (unusual!) requirements, your implementation seems correct. In particular,

  • If you're only updating one attribute, the Python GIL should be sufficient. Single bytecode instructions are atomic.
  • If you do anything more complex, add locking! It's basically harmless anyway - if you cared about performance or multicore scalability, you probably wouldn't be using Python!
  • In particular, be aware that self.thing and self.n in this code are updated in a separate bytecode instructions. The GIL could be released/acquired between, so you can't get a consistent view of the two of them unless you add locking. If you're not going to do that, I'd suggest removing self.n as it's an "attractive nuisance" (easily misused) or at least adding a comment/docstring with this caveat.
  • Consumers don't need to make a copy. You're not ever mutating a particular object pointed to by self.thing (and couldn't with string objects; they're immutable) and Python is garbage-collected, so as long as a consumer grabbed a reference to it, it can keep accessing it without worrying too much about what other threads are doing. The worst that could happen is your program using a lot of memory from several generations of self.thing being kept alive.

I'm a bit curious where your requirements came from. In particular, that you don't care if a thing is never used or used many times.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文