异步监视器状态

发布于 2025-02-01 05:28:16 字数 3486 浏览 5 评论 0原文

我有一个高级案例,有很大需要监视线程状态。

我正在使用GraphQL库,并尝试合并对服务器的调用,但这只是我的用例。在任何情况下,都应适用同样的问题:

  1. 您必须确定要做什么
  2. 等待这些调用的数据的暂停分辨率
  3. 合并呼叫。
  4. 拨打电话。
  5. 以这些调用的结果恢复数据的原始分辨率。
  6. 不要阻止主线程。

由于asyncio在许多库中的扩散,这是不断出现的。这是我发现自己尝试的策略的简化:

# %%

import uuid
import asyncio
import random

_ids = {}

# process all the _id values at one time, after they have 
# been gathered.
async def do_something_with_ids(_ids):
    
    #print to show _ids are not gathered yet
    print(f'_IDS-0: {_ids}')
    
    # 'magic' sleep to wait for _ids to be gathered.
    # (cannot be in final build)
    await asyncio.sleep(3)
    
    # do actual processing
    _ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
    
    return _ids

async def async_test(_ids, t_process):
    
    # create id and add it to _ids
    _uuid = uuid.uuid4()
    _ids[_uuid] = None
    
    # wait for processing to finish
    await t_process
    
    #return value to original caller
    return _ids[_uuid]


t_process = asyncio.create_task(do_something_with_ids(_ids))

t1 = asyncio.create_task(async_test(_ids, t_process))
t2 = asyncio.create_task(async_test(_ids, t_process))
t3 = asyncio.create_task(async_test(_ids, t_process))
t4 = asyncio.create_task(async_test(_ids, t_process))

# Need something here to tell me that all my t{x} tasks
# have reached their awaits.
# .... ? what code to use?

# await processing
await t_process

# retrieve values
result = [await v for v in (t1, t2, t3, t4)]


display(
    result, 
)

注意:这是来自Vscode中的交互式Python文件。如果您想在其他地方运行它,则需要将最后两个等待的调用包装在异步函数中,然后创建一个循环来调用它。

要点是任务do_something_with_ids才能开始等待所有ardernc_test task等待。但是,async_test无法解决do_something_with_ids 完成。

我的示例只是一个简化,但是在生产中,需要假设在任何一个功能中,还有其他coroutines正在进行中。因此,在中do_something_with_idsasync_test可以按任何顺序解决任何数量的函数,但是do_something_with_ids_idsasync_test async_test ,需要等待以下顺序:

async_test
do_something_with_ids
async_test

有没有办法检查任务并确定是否被阻止?

编辑1 ------------------------------------------------

基于 @ Paulcornelius,我尝试了算法的新推导。

import uuid
import asyncio
import random
import time

_ids = {}

async def do_something_with_ids(_ids):
    print(f'_IDS-0: {_ids}')
    await asyncio.sleep(3)
    _ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
    return _ids

async def async_test(_ids, t_process):
    _uuid = uuid.uuid4()
    _ids[_uuid] = None

    #t_process is actually a Future here
    await t_process
    return _ids[_uuid]

#Set up future to use as a call back.
future = asyncio.Future()

t1 = asyncio.create_task(async_test(_ids, future))
t2 = asyncio.create_task(async_test(_ids, future))
t3 = asyncio.create_task(async_test(_ids, future))
t4 = asyncio.create_task(async_test(_ids, future))

await do_something_with_ids(_ids)
future.set_result(0)
result = [await v for v in (t1, t2, t3, t4)]

display(
    result, 
)

结果是:

_IDS-0: {}
[False, False, False, False]

因此,直到主线程达到第一个等待之前,计划的任务似乎才尝试:

await do_something_with_ids(_ids)

是否有办法等待直到被阻止?如果我们能做到这一点,我们可以等待t {x}任务,直到被阻止并在此处创建解决方案。

I have an advanced case where there is a great need to monitor thread status.

I'm using a graphql library and trying to consolidate calls to the server, however that is just my use case. This same problem should apply in any situation where:

  1. You must determine what calls to make
  2. Pause resolution of the data awaiting those calls
  3. Consolidate the calls to make.
  4. Make the calls.
  5. Resume the original resolution of the data with the result of those calls.
  6. Don't block the main thread.

This is something that keeps coming up due to the proliferation of asyncio across many libraries. Here is a simplification of the strategy I find myself trying:

# %%

import uuid
import asyncio
import random

_ids = {}

# process all the _id values at one time, after they have 
# been gathered.
async def do_something_with_ids(_ids):
    
    #print to show _ids are not gathered yet
    print(f'_IDS-0: {_ids}')
    
    # 'magic' sleep to wait for _ids to be gathered.
    # (cannot be in final build)
    await asyncio.sleep(3)
    
    # do actual processing
    _ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
    
    return _ids

async def async_test(_ids, t_process):
    
    # create id and add it to _ids
    _uuid = uuid.uuid4()
    _ids[_uuid] = None
    
    # wait for processing to finish
    await t_process
    
    #return value to original caller
    return _ids[_uuid]


t_process = asyncio.create_task(do_something_with_ids(_ids))

t1 = asyncio.create_task(async_test(_ids, t_process))
t2 = asyncio.create_task(async_test(_ids, t_process))
t3 = asyncio.create_task(async_test(_ids, t_process))
t4 = asyncio.create_task(async_test(_ids, t_process))

# Need something here to tell me that all my t{x} tasks
# have reached their awaits.
# .... ? what code to use?

# await processing
await t_process

# retrieve values
result = [await v for v in (t1, t2, t3, t4)]


display(
    result, 
)

NOTE: this is from an interactive python file in VSCode. If you want to run it somewhere else you need to wrap the final two await calls in an async function and create a loop to call it.

The gist is Task do_something_with_ids can't begin till all async_test tasks have awaited. However, async_test can't resolve till do_something_with_ids is finished.

My example is just a simplification, but in production it needs to be assumed that within either function there are other coroutines going on. So within do_something_with_ids and async_test any number of functions could be resolved in any order, but do_something_with_ids and async_test over all, need to await in this order:

async_test
do_something_with_ids
async_test

Is there a way to inspect a task and determine if it is blocked?

EDIT 1 -----------------------------------

Based on feed back from @PaulCornelius, I tried a new derivation of the algorithm.

import uuid
import asyncio
import random
import time

_ids = {}

async def do_something_with_ids(_ids):
    print(f'_IDS-0: {_ids}')
    await asyncio.sleep(3)
    _ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
    return _ids

async def async_test(_ids, t_process):
    _uuid = uuid.uuid4()
    _ids[_uuid] = None

    #t_process is actually a Future here
    await t_process
    return _ids[_uuid]

#Set up future to use as a call back.
future = asyncio.Future()

t1 = asyncio.create_task(async_test(_ids, future))
t2 = asyncio.create_task(async_test(_ids, future))
t3 = asyncio.create_task(async_test(_ids, future))
t4 = asyncio.create_task(async_test(_ids, future))

await do_something_with_ids(_ids)
future.set_result(0)
result = [await v for v in (t1, t2, t3, t4)]

display(
    result, 
)

The results of this are:

_IDS-0: {}
[False, False, False, False]

So, scheduled task don't seem to be attempted until the main thread hits the first await:

await do_something_with_ids(_ids)

Is there a way to await until blocked? If we could do that we could await the t{x} tasks until blocked and create a solution here.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文