使用Asyncio的TQDM进度栏

发布于 2025-02-07 21:45:54 字数 768 浏览 2 评论 0原文

我正在尝试创建一个单个进度栏,每当完成异步任务时,该栏将进行更新。

我有以下代码上述

scan_results = []
tasks = [self.run_scan(i) for i in input_paths]

pbar = tqdm(total=len(tasks), desc='Scanning files')

for f in asyncio.as_completed(tasks):
    value = await f
    pbar.update(1)
    scan_results.append(value)

代码生成一个单个进度栏,但是直到所有任务完成(在一个任务中显示为0%或100%)之前,它才会更新,而单个任务超过一个任务)

也尝试使用tqdm。 asyncio.tqdm.gather

with tqdm(total=len(input_paths)):
     scan_results = await tqdm.gather(*[self.run_scan(i) for i in input_paths])

以上代码生成多个进度条,就像在上一个代码块中一样,它显示为0%或100%

我的起点是,我的起点是

scan_results = await asyncio.gather(*[self.run_scan(i)for i in input_paths])

您可以帮助您与单个和动态进度一起使用的帮助酒吧

I am trying to create a single progress bar that will be updated whenever an async task is done.

I have the following code

scan_results = []
tasks = [self.run_scan(i) for i in input_paths]

pbar = tqdm(total=len(tasks), desc='Scanning files')

for f in asyncio.as_completed(tasks):
    value = await f
    pbar.update(1)
    scan_results.append(value)

The above code generates a single progress bar as but it is not updated until all tasks are finished (it shows either 0% or 100% while there's more than a single task)

I also tried using tqdm.asyncio.tqdm.gather

with tqdm(total=len(input_paths)):
     scan_results = await tqdm.gather(*[self.run_scan(i) for i in input_paths])

The above code generates multiple progress bars and as in the previous code block, it shows either 0% or 100%

My starting point was

scan_results = await asyncio.gather(*[self.run_scan(i)for i in input_paths])

Would appreciate your help on making it work with a single and dynamic progress bar

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

ζ澈沫 2025-02-14 21:45:54

如果您调用self.pbar.update(1)run_scan扫描方法中创建并发任务后,每个任务将更新pbar for self 。所以你的班级应该看起来如下

class Cls:
    async def run_scan(self, path):
        ...
        self.pbar.update(1)

    def scan(self, input_paths):
        loop = asyncio.get_event_loop()
        tasks = [loop.create_task(self.run_scan(i)) for i in input_paths]
        self.pbar = tqdm(total=len(input_paths), desc='Scanning files')
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

If you call self.pbar.update(1) inside the run_scan scan method after creating concurrent tasks, each task will update the pbar for self. So your class should look like the following

class Cls:
    async def run_scan(self, path):
        ...
        self.pbar.update(1)

    def scan(self, input_paths):
        loop = asyncio.get_event_loop()
        tasks = [loop.create_task(self.run_scan(i)) for i in input_paths]
        self.pbar = tqdm(total=len(input_paths), desc='Scanning files')
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文