python async错误:等待仅在异步函数中允许

发布于 2025-02-10 23:05:40 字数 1761 浏览 3 评论 0原文

我正在尝试异步读取文件,但是我遇到了一个错误,说“ 等待异步函数允许。”,从我的理解中,当等待关键字是在未标记为async的函数内部使用。但是,如下所示,我将read_blob函数标记为async

我在.ipynb jupiter Notebook中没有任何问题(我在其中运行的是按cell依次通过单元格代码),但是当我尝试在.py文件中的vscode中运行它时,它会引发错误。

这是我的代码:

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz$',
                  csv_regex='.+\.csv(\.gz)?$',
                  parquet_regex='.+\.parquet$',
                  regex_string=None,
                  **kwargs):

        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        
        except ResourceNotFoundError:
            return 0


blob_sas_url = "https://proan.blob"
acc = AsyncContainerClient.from_container_url(blob_sas_url)

test_dirs = ["models1/model.parquet", "models2/model.parquet", "models3/model.parquet"]
res = await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

“

I am trying read files asynchronously, but I am running into an error saying “await only allowed in async function.” From my understanding, this error occurs when the await keyword is used inside of a function that was not marked as async. However, as shown in the code below, I mark the read_blob function as async

I don’t have any issues running this in .ipynb Jupiter notebook (in which I run the code cell by cell sequentially), but when I try to run it in VSCode in a .py file, it throws an error.

Here is my code:

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz

error_img

, csv_regex='.+\.csv(\.gz)?

error_img

, parquet_regex='.+\.parquet

error_img

, regex_string=None, **kwargs): assert isinstance(blob_name, str), f'{blob_name} is not a string' try: blob = (await self.download_blob(blob_name)) with BytesIO() as byte_stream: await blob.readinto(byte_stream) byte_stream.seek(0) return pd.read_parquet(byte_stream, engine='pyarrow') except ResourceNotFoundError: return 0 blob_sas_url = "https://proan.blob" acc = AsyncContainerClient.from_container_url(blob_sas_url) test_dirs = ["models1/model.parquet", "models2/model.parquet", "models3/model.parquet"] res = await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

error_img

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

醉南桥 2025-02-17 23:05:40

异步程序的切入点应为asyncio.run,您应该用异步方法包装代码,然后调用它

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz
,
                  csv_regex='.+\.csv(\.gz)?
,
                  parquet_regex='.+\.parquet
,
                  regex_string=None,
                  **kwargs):

        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        
        except ResourceNotFoundError:
            return 0


async def main():
    blob_sas_url = "https://proan.blob"
    acc = AsyncContainerClient.from_container_url(blob_sas_url)

    test_dirs = ["models1/model.parquet", "models2/model.parquet", 
    "models3/model.parquet"]
    return await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

asyncio.run(main())

The entry point to an async program should be asyncio.run, you should wrap your code in an async method then call it

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz
,
                  csv_regex='.+\.csv(\.gz)?
,
                  parquet_regex='.+\.parquet
,
                  regex_string=None,
                  **kwargs):

        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        
        except ResourceNotFoundError:
            return 0


async def main():
    blob_sas_url = "https://proan.blob"
    acc = AsyncContainerClient.from_container_url(blob_sas_url)

    test_dirs = ["models1/model.parquet", "models2/model.parquet", 
    "models3/model.parquet"]
    return await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

asyncio.run(main())
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文