您如何正确处理python中异步代码的异常?
Main.py
import asyncio
import signal
from nats.aio.client import Client as NATS
from client.nats_client import NATSClient
from factory.logger import LoggerFactory
from helpers.environment import get_log_level
log = LoggerFactory.get_logger(__name__, log_level=get_log_level())
if __name__ == "__main__":
"""Using a, low-level asyncio API, to run the CustomClient coroutines"""
client = NATSClient(nc=NATS())
event_loop = asyncio.get_event_loop()
event_loop.create_task(client.start())
try:
event_loop.run_forever()
except KeyboardInterrupt: # CTRL-C was pressed by the user
log.info("CTRL-C was pressed!")
except asyncio.CancelledError:
log.info("cancelled")
finally:
event_loop.close()
Client.py
from factory.logger import LoggerFactory
from helpers.environment import (
get_log_level,
get_nats_connection,
get_nats_queue,
get_nats_subscription_subject,
)
from client.callbacks import closed_cb, disconnected_cb, error_cb, reconnected_cb
from client.handlers import message_handler
log = LoggerFactory.get_logger(__name__, log_level=get_log_level())
class NATSClient:
def __init__(self, nc):
"""_summary_
Args:
nc (_type_): _description_
"""
self.nc = nc
async def start(self):
"""_summary_"""
try:
servers = get_nats_connection()
log.info(f"NATS client connection attempt to: {servers} ...")
# Setting explicit callbacks and list of servers.
await self.nc.connect(
servers=servers,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
error_cb=error_cb,
closed_cb=closed_cb,
)
except Exception as e:
log.error(f"Encountered an error connecting client to server, {e}")
client = self.nc
if client.is_connected:
log.info("NATS client successfully connected to server!")
try:
# Subscription using a queue so that only a single subscriber
await client.subscribe(
subject=get_nats_subscription_subject(),
queue=get_nats_queue(),
cb=message_handler,
)
json.loads(on_purpose) # throw exception on purpose
except Exception as e:
log.info("Encountered error during message handling!", e)
await client.flush(1)
我遇到的问题是,如果我手动按CTRL+C,可以优雅地退出该程序,如果在NATS上的投票过程中发生某些事情,请重新输入以处理消息的循环队列消息没有处理,我必须用CTRL+C手动退出代码。
我还遇到了其他错误,例如:
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000259F51E3550>
Traceback (most recent call last):
line 116, in __del__
self.close()
line 108, in close
self._loop.call_soon(self._call_connection_lost, None)
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.681 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Client._read_loop() done, defined at \.venv\lib\site-packages\nats\aio\client.py:1908> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F53B0610>()]>>
2022-04-13 09:49:55.682 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<Client._ping_interval() done, defined at .venv\lib\site-packages\nats\aio\client.py:1891> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F5395E20>()]>>
Exception ignored in: <coroutine object Client._flusher at 0x00000259F53C28C0>
Traceback (most recent call last):
line 1873, in _flusher
future: asyncio.Future = await self._flush_queue.get()
line 168, in get
getter.cancel() # Just in case getter is not done yet.
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.686 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<Client._flusher() done, defined at .venv\lib\site-packages\nats\aio\client.py:1861> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object Subscription._wait_for_msgs at 0x00000259F53C24C0>
Traceback (most recent call last):
line 271, in _wait_for_msgs
msg = await self._pending_queue.get()
line 168, in get
getter.cancel() # Just in case getter is not done yet.
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.689 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<Subscription._wait_for_msgs() done, defined at .venv\lib\site-packages\nats\aio\subscription.py:262> wait_for=<Future cancelled>>
在理解这种异步概念和错误处理时,任何帮助都得到了极大的赞赏。
main.py
import asyncio
import signal
from nats.aio.client import Client as NATS
from client.nats_client import NATSClient
from factory.logger import LoggerFactory
from helpers.environment import get_log_level
log = LoggerFactory.get_logger(__name__, log_level=get_log_level())
if __name__ == "__main__":
"""Using a, low-level asyncio API, to run the CustomClient coroutines"""
client = NATSClient(nc=NATS())
event_loop = asyncio.get_event_loop()
event_loop.create_task(client.start())
try:
event_loop.run_forever()
except KeyboardInterrupt: # CTRL-C was pressed by the user
log.info("CTRL-C was pressed!")
except asyncio.CancelledError:
log.info("cancelled")
finally:
event_loop.close()
client.py
from factory.logger import LoggerFactory
from helpers.environment import (
get_log_level,
get_nats_connection,
get_nats_queue,
get_nats_subscription_subject,
)
from client.callbacks import closed_cb, disconnected_cb, error_cb, reconnected_cb
from client.handlers import message_handler
log = LoggerFactory.get_logger(__name__, log_level=get_log_level())
class NATSClient:
def __init__(self, nc):
"""_summary_
Args:
nc (_type_): _description_
"""
self.nc = nc
async def start(self):
"""_summary_"""
try:
servers = get_nats_connection()
log.info(f"NATS client connection attempt to: {servers} ...")
# Setting explicit callbacks and list of servers.
await self.nc.connect(
servers=servers,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
error_cb=error_cb,
closed_cb=closed_cb,
)
except Exception as e:
log.error(f"Encountered an error connecting client to server, {e}")
client = self.nc
if client.is_connected:
log.info("NATS client successfully connected to server!")
try:
# Subscription using a queue so that only a single subscriber
await client.subscribe(
subject=get_nats_subscription_subject(),
queue=get_nats_queue(),
cb=message_handler,
)
json.loads(on_purpose) # throw exception on purpose
except Exception as e:
log.info("Encountered error during message handling!", e)
await client.flush(1)
The issue I am having is being able to, gracefully, exit out of the program if I manually press ctrl+c, and re-enter the loop for handling messages if something happens during the polling process on the NATS Queue messages are not processed, and I have to manually exit the code with ctrl+c.
I have also encountered other errors such as:
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000259F51E3550>
Traceback (most recent call last):
line 116, in __del__
self.close()
line 108, in close
self._loop.call_soon(self._call_connection_lost, None)
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.681 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Client._read_loop() done, defined at \.venv\lib\site-packages\nats\aio\client.py:1908> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F53B0610>()]>>
2022-04-13 09:49:55.682 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<Client._ping_interval() done, defined at .venv\lib\site-packages\nats\aio\client.py:1891> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F5395E20>()]>>
Exception ignored in: <coroutine object Client._flusher at 0x00000259F53C28C0>
Traceback (most recent call last):
line 1873, in _flusher
future: asyncio.Future = await self._flush_queue.get()
line 168, in get
getter.cancel() # Just in case getter is not done yet.
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.686 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<Client._flusher() done, defined at .venv\lib\site-packages\nats\aio\client.py:1861> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object Subscription._wait_for_msgs at 0x00000259F53C24C0>
Traceback (most recent call last):
line 271, in _wait_for_msgs
msg = await self._pending_queue.get()
line 168, in get
getter.cancel() # Just in case getter is not done yet.
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.689 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<Subscription._wait_for_msgs() done, defined at .venv\lib\site-packages\nats\aio\subscription.py:262> wait_for=<Future cancelled>>
Any help is greatly appreciated in understanding this async concept and error handling.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论