如何在Python中迭代Queue.Queue项目?

发布于 2024-12-16 16:52:40 字数 133 浏览 2 评论 0原文

有谁知道迭代 Queue.Queue 元素的 Pythonic 方法,而不将它们从队列中删除。我有一个生产者/消费者类型的程序,其中要处理的项目是使用 Queue.Queue 传递的,并且我希望能够打印剩余的项目是什么。有什么想法吗?

Does anyone know a pythonic way of iterating over the elements of a Queue.Queue without removing them from the Queue. I have a producer/consumer-type program where items to be processed are passed by using a Queue.Queue, and I want to be able to print what the remaining items are. Any ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

命硬 2024-12-23 16:52:40

您可以循环访问底层数据存储的副本:

for elem in list(q.queue)

尽管这绕过了队列对象的锁,但列表副本是一个原子操作,应该可以正常工作。

如果您想保留锁,为什么不将所有任务从队列中取出,复制列表,然后将它们放回去。

mycopy = []
while True:
     try:
         elem = q.get(block=False)
     except Empty:
         break
     else:
         mycopy.append(elem)
for elem in mycopy:
    q.put(elem)
for elem in mycopy:
    # do something with the elements

You can loop over a copy of the underlying data store:

for elem in list(q.queue)

Eventhough this bypasses the locks for Queue objects, the list copy is an atomic operation and it should work out fine.

If you want to keep the locks, why not pull all the tasks out of the queue, make your list copy, and then put them back.

mycopy = []
while True:
     try:
         elem = q.get(block=False)
     except Empty:
         break
     else:
         mycopy.append(elem)
for elem in mycopy:
    q.put(elem)
for elem in mycopy:
    # do something with the elements
画骨成沙 2024-12-23 16:52:40

列出队列元素而不消耗它们:

>>> from Queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> print list(q.queue)
[1, 2, 3]

操作后,您仍然可以处理它们:

>>> q.get()
1
>>> print list(q.queue)
[2, 3]

Listing queue elements without consuming them:

>>> from Queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> print list(q.queue)
[1, 2, 3]

After operation, you can still process them:

>>> q.get()
1
>>> print list(q.queue)
[2, 3]
-小熊_ 2024-12-23 16:52:40

您可以子类化 queue.Queue 以线程安全的方式实现此目的:

import queue


class ImprovedQueue(queue.Queue):
    def to_list(self):
        """
        Returns a copy of all items in the queue without removing them.
        """

        with self.mutex:
            return list(self.queue)

You can subclass queue.Queue to achieve this in a thread-safe way:

import queue


class ImprovedQueue(queue.Queue):
    def to_list(self):
        """
        Returns a copy of all items in the queue without removing them.
        """

        with self.mutex:
            return list(self.queue)
奢华的一滴泪 2024-12-23 16:52:40

您可以在打印元素之前将双端队列转换为列表,以便可以轻松地迭代它。

from collections import deque

d = deque([7,9,3,5])

d.append(2)
d.appendleft(1)
d.append(10)
d.pop()

for elem in list(d):
    print(elem, end=" ")

#Output: 1 7 9 3 5 2 

You can convert the deque into a list before printing the elements so that you can easily iterate through it.

from collections import deque

d = deque([7,9,3,5])

d.append(2)
d.appendleft(1)
d.append(10)
d.pop()

for elem in list(d):
    print(elem, end=" ")

#Output: 1 7 9 3 5 2 
涙—继续流 2024-12-23 16:52:40

我已经实现了一个支持async for迭代的IterableQueue(asyncio.Queue)。请参阅 pyutils 中的 GitHub

from pyutils import IterableQueue
from asyncio import run, Task, create_task

async def producer(Q: IterableQueue[int], n: int) -> None:
    await Q.add_producer(N=1) 
    for i in range(n):
        await Q.put(i)
    await Q.finish()
    return None

async def amain():
    q : IterableQueue[int] = IterableQueue(maxsize=5)
    task : Task = create_task(producer(q, 10))
    # Iterate over queue items
    async for i in q:
        print(f"Got {i}")

if __name__ == "__main__":
    run(amain())

IterableQueue() 使用 add_ Producer() 对生产者进行计数。一旦最后一个生产者完成(finish()),那么一个哨兵值(None)就会添加到队列中,标记队列结束。

I have implemented an IterableQueue(asyncio.Queue) that supports async for iteration. See pyutils in GitHub.

from pyutils import IterableQueue
from asyncio import run, Task, create_task

async def producer(Q: IterableQueue[int], n: int) -> None:
    await Q.add_producer(N=1) 
    for i in range(n):
        await Q.put(i)
    await Q.finish()
    return None

async def amain():
    q : IterableQueue[int] = IterableQueue(maxsize=5)
    task : Task = create_task(producer(q, 10))
    # Iterate over queue items
    async for i in q:
        print(f"Got {i}")

if __name__ == "__main__":
    run(amain())

IterableQueue() counts producers with add_producer(). Once the last producers finishes (finish()) then a sentinel value (None) is added to the queue marking the queue end.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文