在Asyncio应用程序中运行CPU绑定任务作为单独的过程会导致显着放缓

发布于 2025-02-13 16:24:48 字数 5688 浏览 0 评论 0 原文

我有一些HTML页面,我试图通过 aiohttp asyncio 从使用异步Web请求从使用异步Web请求中提取文本。我正在使用 BeautifulSoup (在 extract_text())中,从响应中处理文本,并在HTML页面中提取相关文本(排除代码等),但是面对我的同步脚本版本的问题要快于我的异步 +多处理

据我了解,使用 beautifure 函数会导致主事件循环阻止 parse(),因此基于这两个stackoverflow问题[< 0 1 ],我想知道最好的办法是在其自己的进程(作为CPU任务)中运行 extract_text(),该过程应防止事件循环阻止。

这会导致脚本进行 1.5倍次比同步版本更长(没有多处理)。

为了确认这不是我实现异步代码的问题,我删除了 extract_text()的使用,而是从响应对象中保存了原始文本。这样做导致我的异步代码更快,表明该问题纯粹来自 extract_text()在单独的过程上运行。

我在这里错过了一些重要的细节吗?

import asyncio
from asyncio import Semaphore
import json
import logging
from pathlib import Path
from typing import List, Optional

import aiofiles
from aiohttp import ClientSession
import aiohttp
from bs4 import BeautifulSoup
import concurrent.futures
import functools


def extract_text(raw_text: str) -> str:
    return " ".join(BeautifulSoup(raw_text, "html.parser").stripped_strings)


async def fetch_text(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> str:
    async with semaphore:
        response = await session.request(method="GET", url=url, **kwargs)
        response.raise_for_status()
        logging.info("Got response [%s] for URL: %s", response.status, url)
        text = await response.text(encoding="utf-8")
        return text


async def parse(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs,
) -> Optional[str]:
    try:
        text = await fetch_text(
            url=url,
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logging.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
    except Exception as e:
        logging.exception(
            "Non-aiohttp exception occured:  %s",
            getattr(e, "__dict__", None),
        )
    else:
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            extract_text_ = functools.partial(extract_text, text)
            text = await loop.run_in_executor(pool, extract_text_)
            logging.info("Found text for %s", url)
            return text


async def process_file(
    url: dict,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> None:
    category = url.get("category")
    link = url.get("link")
    if category and link:
        text = await parse(
            url=f"{URL}/{link}",
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
        if text:
            save_path = await get_save_path(
                link=link,
                category=category,
            )
            await write_file(html_text=text, path=save_path)
        else:
            logging.warning("Text for %s not found, skipping it...", link)


async def process_files(
    html_files: List[dict],
    semaphore: Semaphore,
) -> None:
    async with ClientSession() as session:
        tasks = [
            process_file(
                url=file,
                session=session,
                semaphore=semaphore,
            )
            for file in html_files
        ]
        await asyncio.gather(*tasks)


async def write_file(
    html_text: str,
    path: Path,
) -> None:
    # Write to file using aiofiles
    ...

async def get_save_path(link: str, category: str) -> Path:
    # return path to save
    ...

async def main_async(
    num_files: Optional[int],
    semaphore_count: int,
) -> None:
    html_files = # get all the files to process
    semaphore = Semaphore(semaphore_count)
    await process_files(
        html_files=html_files,
        semaphore=semaphore,
    )


if __name__ == "__main__":
    NUM_FILES = # passed through CLI args
    SEMAPHORE_COUNT = # passed through CLI args
    asyncio.run(
        main_async(
            num_files=NUM_FILES,
            semaphore_count=SEMAPHORE_COUNT,
        )
    )

snakeviz在1000个样本中图表

  1. async版本,带有extract_text和多处理

”

  1. 无extract_text

”

  1. 同步带有extract_text的版本(请注意,来自Beautifulsoup的HTML_PARSER如何在这里大部分时间

“ notfact_text

  1. 同步版本

.png“ rel =“ nofollow noreferrer”> “在此处输入图像说明”

I have some HTML pages that I am trying to extract the text from using asynchronous web requests through aiohttp and asyncio, after extracting them I save the files locally. I am using BeautifulSoup(under extract_text()), to process the text from the response and extract the relevant text within the HTML page(exclude the code, etc.) but facing an issue where my synchronous version of the script is faster than my asynchronous + multiprocessing.

As I understand, using the BeautifulSoup function causes the main event loop to block within parse(), so based on these two StackOverflow questions[0, 1], I figured the best thing to do was to run the extract_text() within its own process(as its a CPU task) which should prevent the event loop from blocking.

This results in the script taking 1.5x times longer than the synchronous version(with no multiprocessing).

To confirm that this was not an issue with my implementation of the asynchronous code, I removed the use of the extract_text() and instead saved the raw text from the response object. Doing this resulted in my asynchronous code being much faster, showcasing that the issue is purely from the extract_text() being run on a separate process.

Am I missing some important detail here?

import asyncio
from asyncio import Semaphore
import json
import logging
from pathlib import Path
from typing import List, Optional

import aiofiles
from aiohttp import ClientSession
import aiohttp
from bs4 import BeautifulSoup
import concurrent.futures
import functools


def extract_text(raw_text: str) -> str:
    return " ".join(BeautifulSoup(raw_text, "html.parser").stripped_strings)


async def fetch_text(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> str:
    async with semaphore:
        response = await session.request(method="GET", url=url, **kwargs)
        response.raise_for_status()
        logging.info("Got response [%s] for URL: %s", response.status, url)
        text = await response.text(encoding="utf-8")
        return text


async def parse(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs,
) -> Optional[str]:
    try:
        text = await fetch_text(
            url=url,
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logging.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
    except Exception as e:
        logging.exception(
            "Non-aiohttp exception occured:  %s",
            getattr(e, "__dict__", None),
        )
    else:
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            extract_text_ = functools.partial(extract_text, text)
            text = await loop.run_in_executor(pool, extract_text_)
            logging.info("Found text for %s", url)
            return text


async def process_file(
    url: dict,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> None:
    category = url.get("category")
    link = url.get("link")
    if category and link:
        text = await parse(
            url=f"{URL}/{link}",
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
        if text:
            save_path = await get_save_path(
                link=link,
                category=category,
            )
            await write_file(html_text=text, path=save_path)
        else:
            logging.warning("Text for %s not found, skipping it...", link)


async def process_files(
    html_files: List[dict],
    semaphore: Semaphore,
) -> None:
    async with ClientSession() as session:
        tasks = [
            process_file(
                url=file,
                session=session,
                semaphore=semaphore,
            )
            for file in html_files
        ]
        await asyncio.gather(*tasks)


async def write_file(
    html_text: str,
    path: Path,
) -> None:
    # Write to file using aiofiles
    ...

async def get_save_path(link: str, category: str) -> Path:
    # return path to save
    ...

async def main_async(
    num_files: Optional[int],
    semaphore_count: int,
) -> None:
    html_files = # get all the files to process
    semaphore = Semaphore(semaphore_count)
    await process_files(
        html_files=html_files,
        semaphore=semaphore,
    )


if __name__ == "__main__":
    NUM_FILES = # passed through CLI args
    SEMAPHORE_COUNT = # passed through CLI args
    asyncio.run(
        main_async(
            num_files=NUM_FILES,
            semaphore_count=SEMAPHORE_COUNT,
        )
    )

SnakeViz charts across 1000 samples

  1. Async version with extract_text and multiprocessing

Async version with extract_text and multiprocessing

  1. Async version without extract_text

Async version without extract text

  1. Sync version with extract_text(notice how the html_parser from BeautifulSoup takes up the majority of the time here)

Sync version with extract_text

  1. Sync version without extract_text

enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

秋叶绚丽 2025-02-20 16:24:49

这大致是您的异步​​程序所做的:

  1. 启动 num_files parse()任务同时发生
  2. parse()任务创建其自己的 ProcessPoolexececor < /code>和异步等待 extract_text (在先前创建的过程池中执行)。

这是次优的原因,其原因有几个:

  1. 它创建 num_files 处理池,它们创建和获取内存的昂贵,
  2. 每个池仅用于一个合适的操作,这是适得其反的:尽可能多的并发操作应为提交给给定池的提交是

您正在创建一个新的 ProcessPoolExecutor 每次调用 parse()函数时。您可以尝试一次实例化(例如,作为全局,通过函数参数传递):

from concurrent.futures import ProcessPoolExecutor

async def parse(loop, executor, ...):
  ...
  text = await loop.run_in_executor(executor, extract_text)

# and then in `process_file` (or `process_files`):

async def process_file(...):
  ...
  loop = asyncio.get_running_loop()
  with ProcessPoolExecutor() as executor:
    ...
    await process(loop, executor, ...)

我在我的旧MacBook Air 2015上基于创建 ProcessPoolExecutor 的开销,这表明它很相当慢(用于池创建,打开,提交和关闭的近100毫秒):

from time import perf_counter
from concurrent.futures import ProcessPoolExecutor

def main_1():
    """Pool crated once"""
    reps = 100
    t1 = perf_counter()
    with ProcessPoolExecutor() as executor:
        for _ in range(reps):
            executor.submit(lambda: None) 
    t2 = perf_counter()   
    print(f"{(t2 - t1) / reps * 1_000} ms")  # 2 ms/it

def main_2():
    """Pool created at each iteration"""
    reps = 100
    t1 = perf_counter()
    for _ in range(reps):
        with ProcessPoolExecutor() as executor:
            executor.submit(lambda: None) 
    t2 = perf_counter()   
    print(f"{(t2 - t1) / reps * 1_000} ms")  # 100 ms/it

if __name__ == "__main__":
    main_1()
    main_2()

您可以再次将其悬挂在 Process_files 函数中,该功能避免重新为每个文件重新创建池。

另外,尝试更仔细地检查您的第一个Snakeviz图表,以了解 Process.py.py:submit 的确切时间。


最后一件事,请注意在执行人上使用上下文管理器的语义:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  for i in range(100):
    executor.submit(some_work, i)

不仅会创建和执行器并提交工作,而且还要等待 在退出带有语句。

Here is roughly what your asynchronous program does:

  1. Launch num_files parse() tasks concurrently
  2. Each parse() task creates its own ProcessPoolExecutor and asynchronously awaits for extract_text (which is executed in the previously created process pool).

This is suboptimal for several reasons:

  1. It creates num_files process pools, which are expensive to create and takes memory
  2. Each pool is only used for one single operation, which is counterproductive: as many concurrent operations as possible should be submitted to a given pool

You are creating a new ProcessPoolExecutor each time the parse() function is called. You could try to instantiate it once (as a global for instance, of passed through a function argument):

from concurrent.futures import ProcessPoolExecutor

async def parse(loop, executor, ...):
  ...
  text = await loop.run_in_executor(executor, extract_text)

# and then in `process_file` (or `process_files`):

async def process_file(...):
  ...
  loop = asyncio.get_running_loop()
  with ProcessPoolExecutor() as executor:
    ...
    await process(loop, executor, ...)

I benchmarked the overhead of creating a ProcessPoolExecutor on my old MacBook Air 2015 and it shows that it is quite slow (almost 100 ms for pool creation, opening, submit and shutdown):

from time import perf_counter
from concurrent.futures import ProcessPoolExecutor

def main_1():
    """Pool crated once"""
    reps = 100
    t1 = perf_counter()
    with ProcessPoolExecutor() as executor:
        for _ in range(reps):
            executor.submit(lambda: None) 
    t2 = perf_counter()   
    print(f"{(t2 - t1) / reps * 1_000} ms")  # 2 ms/it

def main_2():
    """Pool created at each iteration"""
    reps = 100
    t1 = perf_counter()
    for _ in range(reps):
        with ProcessPoolExecutor() as executor:
            executor.submit(lambda: None) 
    t2 = perf_counter()   
    print(f"{(t2 - t1) / reps * 1_000} ms")  # 100 ms/it

if __name__ == "__main__":
    main_1()
    main_2()

You may again hoist it up in the process_files function, which avoid recreating the pool for each file.

Also, try to inspect more closely your first SnakeViz chart in order to know what exactly in process.py:submit is taking that much time.


One last thing, be careful of the semantics of using a context manager on an executor:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  for i in range(100):
    executor.submit(some_work, i)

Not only this creates and executor and submit work to it but it also waits for all work to finish before exiting the with statement.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文