可以在垃圾收集期间调用on_state_changed,请确保关闭或使用上下文管理器

发布于 2025-02-06 23:39:51 字数 1528 浏览 0 评论 0原文

我正在使用Python脚本通过WebSocket协议将API与Azure事件中心连接。当我运行脚本时,将发送数据,直到本文标题中的错误跳出并停止执行脚本。我看不到错误的起源,这就是为什么我感到困惑。 错误说:“ ...请确保关闭或使用上下文管理器。”我想知道哪个部分导致此错误以及如何解决该错误。

import asyncio
import json
import websockets

#for evh
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData


async def cryptocompare():

    producer = EventHubProducerClient.from_connection_string(conn_str="CONN_STR", eventhub_name="EVH_NAME")

    
    # this is where you paste your api key
    api_key = "API_KEY"
    url = "wss://streamer.cryptocompare.com/v2?api_key=" + api_key
    async with websockets.connect(url) as websocket:
        await websocket.send(json.dumps({
            "action": "SubAdd",
            "subs": ["0~Coinbase~BTC~EUR","0~Coinbase~BTC~USD","0~Coinbase~BTC~CHF"],
        }))
        while True:
            try:
                data = await websocket.recv()
            except websockets.ConnectionClosed:
                break
            try:
                #data = json.loads(data)
                event_data_batch = await producer.create_batch()

                # Add events to the batch.
                #for i in data:
                event_data_batch.add(EventData(data))
                # Send the batch of events to the event hub.
                await producer.send_batch(event_data_batch)
                print(json.dumps(data, indent=4))
            except ValueError:
                print(data)


asyncio.get_event_loop().run_until_complete(cryptocompare())

I am using Python script for connecting API with Azure Event Hub via websocket protocol. When I run the script, data is being sent until error from the title of this post jumps out and stops execution of the script. I cannot see the origin of the error and that's why I'm confused.
Error says '...please be sure to close or use a context manager.' I wonder which part causes this error and how to possibly solve it.

import asyncio
import json
import websockets

#for evh
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData


async def cryptocompare():

    producer = EventHubProducerClient.from_connection_string(conn_str="CONN_STR", eventhub_name="EVH_NAME")

    
    # this is where you paste your api key
    api_key = "API_KEY"
    url = "wss://streamer.cryptocompare.com/v2?api_key=" + api_key
    async with websockets.connect(url) as websocket:
        await websocket.send(json.dumps({
            "action": "SubAdd",
            "subs": ["0~Coinbase~BTC~EUR","0~Coinbase~BTC~USD","0~Coinbase~BTC~CHF"],
        }))
        while True:
            try:
                data = await websocket.recv()
            except websockets.ConnectionClosed:
                break
            try:
                #data = json.loads(data)
                event_data_batch = await producer.create_batch()

                # Add events to the batch.
                #for i in data:
                event_data_batch.add(EventData(data))
                # Send the batch of events to the event hub.
                await producer.send_batch(event_data_batch)
                print(json.dumps(data, indent=4))
            except ValueError:
                print(data)


asyncio.get_event_loop().run_until_complete(cryptocompare())

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

歌枕肩 2025-02-13 23:39:51

要解决此错误,您可以用ync用生产者:将代码包裹代码。

    async with producer:
        while True:
            try:
                data = await websocket.recv()
            except websockets.ConnectionClosed:
                break
            try:
                #data = json.loads(data)
                event_data_batch = await producer.create_batch()

                # Add events to the batch.
                #for i in data:
                event_data_batch.add(EventData(data))
                # Send the batch of events to the event hub.
                await producer.send_batch(event_data_batch)
                print(json.dumps(data, indent=4))
            except ValueError:
                print(data)

当客户端未正确关闭时,会发生此错误,因为在尝试垃圾收集时,发件人仍然打开。可以找到更详细的解释在这里

与上下文管理器发送的示例“ rel =“ nofollow noreferrer”>在这里。

To fix this error, you can wrap the code where you call the producer with async with producer:.

    async with producer:
        while True:
            try:
                data = await websocket.recv()
            except websockets.ConnectionClosed:
                break
            try:
                #data = json.loads(data)
                event_data_batch = await producer.create_batch()

                # Add events to the batch.
                #for i in data:
                event_data_batch.add(EventData(data))
                # Send the batch of events to the event hub.
                await producer.send_batch(event_data_batch)
                print(json.dumps(data, indent=4))
            except ValueError:
                print(data)

This error occurs when the client is not closed properly, because the sender is still open when trying to garbage collect. More detailed explanation can be found here.

Sample of sending with context manager here.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文