异步监视器状态
我有一个高级案例,有很大需要监视线程状态。
我正在使用GraphQL库,并尝试合并对服务器的调用,但这只是我的用例。在任何情况下,都应适用同样的问题:
- 您必须确定要做什么
- 等待这些调用的数据的暂停分辨率
- 合并呼叫。
- 拨打电话。
- 以这些调用的结果恢复数据的原始分辨率。
- 不要阻止主线程。
由于asyncio
在许多库中的扩散,这是不断出现的。这是我发现自己尝试的策略的简化:
# %%
import uuid
import asyncio
import random
_ids = {}
# process all the _id values at one time, after they have
# been gathered.
async def do_something_with_ids(_ids):
#print to show _ids are not gathered yet
print(f'_IDS-0: {_ids}')
# 'magic' sleep to wait for _ids to be gathered.
# (cannot be in final build)
await asyncio.sleep(3)
# do actual processing
_ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
return _ids
async def async_test(_ids, t_process):
# create id and add it to _ids
_uuid = uuid.uuid4()
_ids[_uuid] = None
# wait for processing to finish
await t_process
#return value to original caller
return _ids[_uuid]
t_process = asyncio.create_task(do_something_with_ids(_ids))
t1 = asyncio.create_task(async_test(_ids, t_process))
t2 = asyncio.create_task(async_test(_ids, t_process))
t3 = asyncio.create_task(async_test(_ids, t_process))
t4 = asyncio.create_task(async_test(_ids, t_process))
# Need something here to tell me that all my t{x} tasks
# have reached their awaits.
# .... ? what code to use?
# await processing
await t_process
# retrieve values
result = [await v for v in (t1, t2, t3, t4)]
display(
result,
)
注意:这是来自Vscode中的交互式Python文件。如果您想在其他地方运行它,则需要将最后两个等待的调用包装在异步函数中,然后创建一个循环来调用它。
要点是任务do_something_with_ids
才能开始等待所有ardernc_test
task等待。但是,async_test
无法解决
我的示例只是一个简化,但是在生产中,需要假设在任何一个功能中,还有其他coroutines正在进行中。因此,在中do_something_with_ids
和async_test
可以按任何顺序解决任何数量的函数,但是do_something_with_ids_ids
和async_test
async_test ,需要等待以下顺序:
async_test
do_something_with_ids
async_test
有没有办法检查任务并确定是否被阻止?
编辑1 ------------------------------------------------
基于 @ Paulcornelius,我尝试了算法的新推导。
import uuid
import asyncio
import random
import time
_ids = {}
async def do_something_with_ids(_ids):
print(f'_IDS-0: {_ids}')
await asyncio.sleep(3)
_ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
return _ids
async def async_test(_ids, t_process):
_uuid = uuid.uuid4()
_ids[_uuid] = None
#t_process is actually a Future here
await t_process
return _ids[_uuid]
#Set up future to use as a call back.
future = asyncio.Future()
t1 = asyncio.create_task(async_test(_ids, future))
t2 = asyncio.create_task(async_test(_ids, future))
t3 = asyncio.create_task(async_test(_ids, future))
t4 = asyncio.create_task(async_test(_ids, future))
await do_something_with_ids(_ids)
future.set_result(0)
result = [await v for v in (t1, t2, t3, t4)]
display(
result,
)
结果是:
_IDS-0: {}
[False, False, False, False]
因此,直到主线程达到第一个等待之前,计划的任务似乎才尝试:
await do_something_with_ids(_ids)
是否有办法等待直到被阻止?如果我们能做到这一点,我们可以等待t {x}任务,直到被阻止并在此处创建解决方案。
I have an advanced case where there is a great need to monitor thread status.
I'm using a graphql library and trying to consolidate calls to the server, however that is just my use case. This same problem should apply in any situation where:
- You must determine what calls to make
- Pause resolution of the data awaiting those calls
- Consolidate the calls to make.
- Make the calls.
- Resume the original resolution of the data with the result of those calls.
- Don't block the main thread.
This is something that keeps coming up due to the proliferation of asyncio
across many libraries. Here is a simplification of the strategy I find myself trying:
# %%
import uuid
import asyncio
import random
_ids = {}
# process all the _id values at one time, after they have
# been gathered.
async def do_something_with_ids(_ids):
#print to show _ids are not gathered yet
print(f'_IDS-0: {_ids}')
# 'magic' sleep to wait for _ids to be gathered.
# (cannot be in final build)
await asyncio.sleep(3)
# do actual processing
_ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
return _ids
async def async_test(_ids, t_process):
# create id and add it to _ids
_uuid = uuid.uuid4()
_ids[_uuid] = None
# wait for processing to finish
await t_process
#return value to original caller
return _ids[_uuid]
t_process = asyncio.create_task(do_something_with_ids(_ids))
t1 = asyncio.create_task(async_test(_ids, t_process))
t2 = asyncio.create_task(async_test(_ids, t_process))
t3 = asyncio.create_task(async_test(_ids, t_process))
t4 = asyncio.create_task(async_test(_ids, t_process))
# Need something here to tell me that all my t{x} tasks
# have reached their awaits.
# .... ? what code to use?
# await processing
await t_process
# retrieve values
result = [await v for v in (t1, t2, t3, t4)]
display(
result,
)
NOTE: this is from an interactive python file in VSCode. If you want to run it somewhere else you need to wrap the final two await calls in an async function and create a loop to call it.
The gist is Task do_something_with_ids
can't begin till all async_test
tasks have awaited. However, async_test
can't resolve till do_something_with_ids
is finished.
My example is just a simplification, but in production it needs to be assumed that within either function there are other coroutines going on. So within do_something_with_ids
and async_test
any number of functions could be resolved in any order, but do_something_with_ids
and async_test
over all, need to await in this order:
async_test
do_something_with_ids
async_test
Is there a way to inspect a task and determine if it is blocked?
EDIT 1 -----------------------------------
Based on feed back from @PaulCornelius, I tried a new derivation of the algorithm.
import uuid
import asyncio
import random
import time
_ids = {}
async def do_something_with_ids(_ids):
print(f'_IDS-0: {_ids}')
await asyncio.sleep(3)
_ids.update({k:bool(random.getrandbits(1)) for k in _ids.keys()})
return _ids
async def async_test(_ids, t_process):
_uuid = uuid.uuid4()
_ids[_uuid] = None
#t_process is actually a Future here
await t_process
return _ids[_uuid]
#Set up future to use as a call back.
future = asyncio.Future()
t1 = asyncio.create_task(async_test(_ids, future))
t2 = asyncio.create_task(async_test(_ids, future))
t3 = asyncio.create_task(async_test(_ids, future))
t4 = asyncio.create_task(async_test(_ids, future))
await do_something_with_ids(_ids)
future.set_result(0)
result = [await v for v in (t1, t2, t3, t4)]
display(
result,
)
The results of this are:
_IDS-0: {}
[False, False, False, False]
So, scheduled task don't seem to be attempted until the main thread hits the first await:
await do_something_with_ids(_ids)
Is there a way to await until blocked? If we could do that we could await the t{x} tasks until blocked and create a solution here.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论