在Asyncio应用程序中运行CPU绑定任务作为单独的过程会导致显着放缓
我有一些HTML页面,我试图通过 aiohttp
和 asyncio
从使用异步Web请求从使用异步Web请求中提取文本。我正在使用 BeautifulSoup
(在 extract_text()
)中,从响应中处理文本,并在HTML页面中提取相关文本(排除代码等),但是面对我的同步
脚本版本的问题要快于我的异步 +多处理
。
据我了解,使用 beautifure
函数会导致主事件循环阻止 parse()
,因此基于这两个stackoverflow问题[< 0 , 1 ],我想知道最好的办法是在其自己的进程(作为CPU任务)中运行 extract_text()
,该过程应防止事件循环阻止。
这会导致脚本进行 1.5倍次
比同步版本更长(没有多处理)。
为了确认这不是我实现异步代码的问题,我删除了 extract_text()
的使用,而是从响应对象中保存了原始文本。这样做导致我的异步
代码更快,表明该问题纯粹来自 extract_text()
在单独的过程上运行。
我在这里错过了一些重要的细节吗?
import asyncio
from asyncio import Semaphore
import json
import logging
from pathlib import Path
from typing import List, Optional
import aiofiles
from aiohttp import ClientSession
import aiohttp
from bs4 import BeautifulSoup
import concurrent.futures
import functools
def extract_text(raw_text: str) -> str:
return " ".join(BeautifulSoup(raw_text, "html.parser").stripped_strings)
async def fetch_text(
url: str,
session: ClientSession,
semaphore: Semaphore,
**kwargs: dict,
) -> str:
async with semaphore:
response = await session.request(method="GET", url=url, **kwargs)
response.raise_for_status()
logging.info("Got response [%s] for URL: %s", response.status, url)
text = await response.text(encoding="utf-8")
return text
async def parse(
url: str,
session: ClientSession,
semaphore: Semaphore,
**kwargs,
) -> Optional[str]:
try:
text = await fetch_text(
url=url,
session=session,
semaphore=semaphore,
**kwargs,
)
except (
aiohttp.ClientError,
aiohttp.http_exceptions.HttpProcessingError,
) as e:
logging.error(
"aiohttp exception for %s [%s]: %s",
url,
getattr(e, "status", None),
getattr(e, "message", None),
)
except Exception as e:
logging.exception(
"Non-aiohttp exception occured: %s",
getattr(e, "__dict__", None),
)
else:
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
extract_text_ = functools.partial(extract_text, text)
text = await loop.run_in_executor(pool, extract_text_)
logging.info("Found text for %s", url)
return text
async def process_file(
url: dict,
session: ClientSession,
semaphore: Semaphore,
**kwargs: dict,
) -> None:
category = url.get("category")
link = url.get("link")
if category and link:
text = await parse(
url=f"{URL}/{link}",
session=session,
semaphore=semaphore,
**kwargs,
)
if text:
save_path = await get_save_path(
link=link,
category=category,
)
await write_file(html_text=text, path=save_path)
else:
logging.warning("Text for %s not found, skipping it...", link)
async def process_files(
html_files: List[dict],
semaphore: Semaphore,
) -> None:
async with ClientSession() as session:
tasks = [
process_file(
url=file,
session=session,
semaphore=semaphore,
)
for file in html_files
]
await asyncio.gather(*tasks)
async def write_file(
html_text: str,
path: Path,
) -> None:
# Write to file using aiofiles
...
async def get_save_path(link: str, category: str) -> Path:
# return path to save
...
async def main_async(
num_files: Optional[int],
semaphore_count: int,
) -> None:
html_files = # get all the files to process
semaphore = Semaphore(semaphore_count)
await process_files(
html_files=html_files,
semaphore=semaphore,
)
if __name__ == "__main__":
NUM_FILES = # passed through CLI args
SEMAPHORE_COUNT = # passed through CLI args
asyncio.run(
main_async(
num_files=NUM_FILES,
semaphore_count=SEMAPHORE_COUNT,
)
)
snakeviz在1000个样本中图表
- async版本,带有extract_text和多处理
- 无extract_text
- 同步带有extract_text的版本(请注意,来自Beautifulsoup的HTML_PARSER如何在这里大部分时间)
- 同步版本
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这大致是您的异步程序所做的:
num_files
parse()
任务同时发生parse()
任务创建其自己的ProcessPoolexececor < /code>和异步等待
extract_text
(在先前创建的过程池中执行)。这是次优的原因,其原因有几个:
num_files
处理池,它们创建和获取内存的昂贵,您正在创建一个新的
ProcessPoolExecutor
每次调用parse()
函数时。您可以尝试一次实例化(例如,作为全局,通过函数参数传递):我在我的旧MacBook Air 2015上基于创建
ProcessPoolExecutor
的开销,这表明它很相当慢(用于池创建,打开,提交和关闭的近100毫秒):您可以再次将其悬挂在
Process_files
函数中,该功能避免重新为每个文件重新创建池。另外,尝试更仔细地检查您的第一个Snakeviz图表,以了解
Process.py.py:submit
的确切时间。最后一件事,请注意在执行人上使用上下文管理器的语义:
不仅会创建和执行器并提交工作,而且还要等待 在退出
带有语句。
Here is roughly what your asynchronous program does:
num_files
parse()
tasks concurrentlyparse()
task creates its ownProcessPoolExecutor
and asynchronously awaits forextract_text
(which is executed in the previously created process pool).This is suboptimal for several reasons:
num_files
process pools, which are expensive to create and takes memoryYou are creating a new
ProcessPoolExecutor
each time theparse()
function is called. You could try to instantiate it once (as a global for instance, of passed through a function argument):I benchmarked the overhead of creating a
ProcessPoolExecutor
on my old MacBook Air 2015 and it shows that it is quite slow (almost 100 ms for pool creation, opening, submit and shutdown):You may again hoist it up in the
process_files
function, which avoid recreating the pool for each file.Also, try to inspect more closely your first SnakeViz chart in order to know what exactly in
process.py:submit
is taking that much time.One last thing, be careful of the semantics of using a context manager on an executor:
Not only this creates and executor and submit work to it but it also waits for all work to finish before exiting the
with
statement.