您如何正确处理python中异步代码的异常?

发布于 2025-01-20 21:25:38 字数 5336 浏览 1 评论 0原文

Main.py

import asyncio
import signal

from nats.aio.client import Client as NATS

from client.nats_client import NATSClient
from factory.logger import LoggerFactory
from helpers.environment import get_log_level

log = LoggerFactory.get_logger(__name__, log_level=get_log_level())


if __name__ == "__main__":
    """Using a, low-level asyncio API, to run the CustomClient coroutines"""
    client = NATSClient(nc=NATS())
    event_loop = asyncio.get_event_loop()
    event_loop.create_task(client.start())
    try:
        event_loop.run_forever()
    except KeyboardInterrupt:  # CTRL-C was pressed by the user
        log.info("CTRL-C was pressed!")
    except asyncio.CancelledError:
        log.info("cancelled")
    finally:
        event_loop.close()

Client.py

from factory.logger import LoggerFactory
from helpers.environment import (
    get_log_level,
    get_nats_connection,
    get_nats_queue,
    get_nats_subscription_subject,
)

from client.callbacks import closed_cb, disconnected_cb, error_cb, reconnected_cb
from client.handlers import message_handler

log = LoggerFactory.get_logger(__name__, log_level=get_log_level())


class NATSClient:
    def __init__(self, nc):
        """_summary_

        Args:
            nc (_type_): _description_
        """
        self.nc = nc

    async def start(self):
        """_summary_"""
        try:
            servers = get_nats_connection()
            log.info(f"NATS client connection attempt to: {servers} ...")
            # Setting explicit callbacks and list of servers.
            await self.nc.connect(
                servers=servers,
                reconnected_cb=reconnected_cb,
                disconnected_cb=disconnected_cb,
                error_cb=error_cb,
                closed_cb=closed_cb,
            )
        except Exception as e:
            log.error(f"Encountered an error connecting client to server, {e}")

        client = self.nc

        if client.is_connected:
            log.info("NATS client successfully connected to server!")

            try:
                # Subscription using a queue so that only a single subscriber
                await client.subscribe(
                    subject=get_nats_subscription_subject(),
                    queue=get_nats_queue(),
                    cb=message_handler,
                )
                json.loads(on_purpose)  # throw exception on purpose
            except Exception as e:
                log.info("Encountered error during message handling!", e)

            await client.flush(1)

我遇到的问题是,如果我手动按CTRL+C,可以优雅地退出该程序,如果在NATS上的投票过程中发生某些事情,请重新输入以处理消息的循环队列消息没有处理,我必须用CTRL+C手动退出代码。

我还遇到了其他错误,例如:

    Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000259F51E3550>
Traceback (most recent call last):
  line 116, in __del__
    self.close()
  line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.681 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Client._read_loop() done, defined at \.venv\lib\site-packages\nats\aio\client.py:1908> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F53B0610>()]>>
2022-04-13 09:49:55.682 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<Client._ping_interval() done, defined at .venv\lib\site-packages\nats\aio\client.py:1891> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F5395E20>()]>>
Exception ignored in: <coroutine object Client._flusher at 0x00000259F53C28C0>
Traceback (most recent call last):
  line 1873, in _flusher
    future: asyncio.Future = await self._flush_queue.get()
  line 168, in get
    getter.cancel()  # Just in case getter is not done yet.
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.686 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<Client._flusher() done, defined at .venv\lib\site-packages\nats\aio\client.py:1861> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object Subscription._wait_for_msgs at 0x00000259F53C24C0>
Traceback (most recent call last):
  line 271, in _wait_for_msgs
    msg = await self._pending_queue.get()
  line 168, in get
    getter.cancel()  # Just in case getter is not done yet.
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.689 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<Subscription._wait_for_msgs() done, defined at .venv\lib\site-packages\nats\aio\subscription.py:262> wait_for=<Future cancelled>>

在理解这种异步概念和错误处理时,任何帮助都得到了极大的赞赏。

main.py

import asyncio
import signal

from nats.aio.client import Client as NATS

from client.nats_client import NATSClient
from factory.logger import LoggerFactory
from helpers.environment import get_log_level

log = LoggerFactory.get_logger(__name__, log_level=get_log_level())


if __name__ == "__main__":
    """Using a, low-level asyncio API, to run the CustomClient coroutines"""
    client = NATSClient(nc=NATS())
    event_loop = asyncio.get_event_loop()
    event_loop.create_task(client.start())
    try:
        event_loop.run_forever()
    except KeyboardInterrupt:  # CTRL-C was pressed by the user
        log.info("CTRL-C was pressed!")
    except asyncio.CancelledError:
        log.info("cancelled")
    finally:
        event_loop.close()

client.py

from factory.logger import LoggerFactory
from helpers.environment import (
    get_log_level,
    get_nats_connection,
    get_nats_queue,
    get_nats_subscription_subject,
)

from client.callbacks import closed_cb, disconnected_cb, error_cb, reconnected_cb
from client.handlers import message_handler

log = LoggerFactory.get_logger(__name__, log_level=get_log_level())


class NATSClient:
    def __init__(self, nc):
        """_summary_

        Args:
            nc (_type_): _description_
        """
        self.nc = nc

    async def start(self):
        """_summary_"""
        try:
            servers = get_nats_connection()
            log.info(f"NATS client connection attempt to: {servers} ...")
            # Setting explicit callbacks and list of servers.
            await self.nc.connect(
                servers=servers,
                reconnected_cb=reconnected_cb,
                disconnected_cb=disconnected_cb,
                error_cb=error_cb,
                closed_cb=closed_cb,
            )
        except Exception as e:
            log.error(f"Encountered an error connecting client to server, {e}")

        client = self.nc

        if client.is_connected:
            log.info("NATS client successfully connected to server!")

            try:
                # Subscription using a queue so that only a single subscriber
                await client.subscribe(
                    subject=get_nats_subscription_subject(),
                    queue=get_nats_queue(),
                    cb=message_handler,
                )
                json.loads(on_purpose)  # throw exception on purpose
            except Exception as e:
                log.info("Encountered error during message handling!", e)

            await client.flush(1)

The issue I am having is being able to, gracefully, exit out of the program if I manually press ctrl+c, and re-enter the loop for handling messages if something happens during the polling process on the NATS Queue messages are not processed, and I have to manually exit the code with ctrl+c.

I have also encountered other errors such as:

    Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000259F51E3550>
Traceback (most recent call last):
  line 116, in __del__
    self.close()
  line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.681 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Client._read_loop() done, defined at \.venv\lib\site-packages\nats\aio\client.py:1908> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F53B0610>()]>>
2022-04-13 09:49:55.682 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<Client._ping_interval() done, defined at .venv\lib\site-packages\nats\aio\client.py:1891> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F5395E20>()]>>
Exception ignored in: <coroutine object Client._flusher at 0x00000259F53C28C0>
Traceback (most recent call last):
  line 1873, in _flusher
    future: asyncio.Future = await self._flush_queue.get()
  line 168, in get
    getter.cancel()  # Just in case getter is not done yet.
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.686 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<Client._flusher() done, defined at .venv\lib\site-packages\nats\aio\client.py:1861> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object Subscription._wait_for_msgs at 0x00000259F53C24C0>
Traceback (most recent call last):
  line 271, in _wait_for_msgs
    msg = await self._pending_queue.get()
  line 168, in get
    getter.cancel()  # Just in case getter is not done yet.
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.689 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<Subscription._wait_for_msgs() done, defined at .venv\lib\site-packages\nats\aio\subscription.py:262> wait_for=<Future cancelled>>

Any help is greatly appreciated in understanding this async concept and error handling.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文