如何动态的将一个future添加到一个正在运行的asyncio模块的事件循环中?

发布于 2022-09-05 10:25:01 字数 3090 浏览 14 评论 0

自学asyncio模块中,问题多多~
python3.5 的 asyncio 模块中,一般都是通过 事件循环对象 的 run_until_complete(future) 来异步地运行多个协程的,参数 future 大都是由 asyncio.wait() 或 asyncio.gather() 将多个本地协程函数封装成一个future,但问题是这 “ 多个本地协程函数 ” 是一次写死的,可不可以在 run_until_complete 运行过程中动态添加呢?或者用其他方法来达成类似效果?

比如在处理socket模块那典型的C/S模型的服务端的时候 :

#! /usr/bin/env python3
# -*- coding:utf-8 -*-

# 一个未完成的 异步echo-server

import asyncio,socket,queue

server = socket.socket()
server.bind(('0.0.0.0',12345))
server.listen()

client_queue = queue.Queue()

async def accept():
    return server.accept()

async def get_client_socket():
    while 1:
        client, addr = await accept()
        print('%s has connected'%str(addr))
        client_queue.put((client,addr))

async def send(sock,data):
    sock.send(data)

async def recv(sock):
    return sock.recv(1024)

async def interactive():
    client, addr = client_queue.get()
    while 1:
        receive = await recv(client)
        print('Received %r from %r'%(receive,addr))

        await send(client,receive)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    '''
    如果写
    loop.run_until_complete(asyncio.gather(
        get_client_socket(),
        interactive()
    ))
    肯定不对...
    到底该怎么实现“server 不断生产新的负责与客户端连接的socket”的get_client_socket协程和“每个‘负责与客户端连接的socket’ 与客户端交互”的interactive协程之间的并行呢?
    '''

如上例,要如何实现
“ server 不断生成新的负责与客户端连接的 socket” 的 get_client_socket协程

“每个‘负责与客户端连接的socket’ 与客户端交互”的 interactive协程
之间的异步并行呢?


看了某大神的文章(http://python.jobbole.com/873...,试着修改了下

#! /usr/bin/env python3
# -*- coding:utf-8 -*-

# 一个有毛病的 异步echo-server

import asyncio,threading,socket

server = socket.socket()
server.bind(('127.0.0.1',12345))
server.listen()

async def recv_data(conn):
    return conn.recv(1024)


async def send_data(conn,data):
    conn.send(data)


async def interactive(conn,addr):
    while 1:
        receive = await recv_data(conn)
        print('Received %s from %s'%(receive.decode('utf-8'),repr(addr)))
        await send_data(conn,receive)


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

new_loop = asyncio.new_event_loop()

t = threading.Thread(target=start_loop,args=(new_loop,))
t.start()


while 1:
    conn,addr = server.accept()
    print('%s has connected'%repr(addr))
    asyncio.run_coroutine_threadsafe(interactive(conn,addr),new_loop)
    print('New_loop\'s tasks num:',len(asyncio.Task.all_tasks(new_loop)))

简化版的客户端:

#! /usr/bin/env python3
# -*- coding:utf-8 -*-

# 简化版 echo-client

import socket

client = socket.socket()
client.connect(('127.0.0.1',12345))

while 1:
    msg = input('>> ').strip()
    client.send(msg.encode())
    receive = client.recv(1024)
    print('Received:',receive.decode('utf-8'))

启动一个服务端,启动3个客户端,3个客户端分别向服务端发送消息,服务端的回显如下:图片描述

问题:
如图所示,为什么只有1个客户端可以与服务端交互?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

绮筵 2022-09-12 10:25:01

看这里
查找
不同线程的事件循环
希望能帮到你

梦回梦里 2022-09-12 10:25:01

也是一个刚学的新手,不知道对不对。如果没理解错,楼主是和我一样想在async的函数在运行的时候往loop里动态添加新的Task。答案应该就是在楼上链接里面 “协程嵌套” 那章,在async的main里添加了新的Task


PS:发现答错了,协程嵌套的新协程和main不是同一级的协程,要等到所有新协程执行完才会回到main...看来只有楼上的办法了,在另一个线程里新增协程才不会await


PSS:尝试了下,似乎可行的方案是:在loop开始之前用asyncio.ensure_future()新增协程,紧接着loop.run_forever(),在协程里如果想添加新的协程,同样用asyncio.ensure_future()新增协程,因为没有使用await,所以这样就不会挂起新协程了。
如果想停掉loop,那就把loop传入到协程内,调用loop.stop()

趁年轻赶紧闹 2022-09-12 10:25:01

正好也遇到了这个问题, 花了老长时间解决, 希望对其他人有所帮助.

关于楼主的echo server只能与单个客户端通信的问题, 主要有两点:

  1. 设置套接字为非阻塞模式
  2. 自定义的accept, recvsend协程函数应该还是差点东西.

我后来找到了这篇文章从 asyncio 简单实现看异步是如何工作的, 它用loop.create_task()函数动态添加任务到事件循环, 然后用asyncio内置的底层异步协程函数sock_accept, sock_recv, sock_send完成echo server.

这是我写的小demo 协程模型回显服务器, 如果觉得对你帮助, 顺便帮我点个星, 谢谢.

小矜持 2022-09-12 10:25:01

其实是动态的向一个已经在运行的LOOP中怎么添加任务的问题吧.
我这个例子你试试看.
在主线程中运行一个LOOP, 然后开10个子线程,获取主线程中的 queue 的数据.
当然你也可以反着写.
asyncio.run_coroutine_threadsafe
loop.call_soon_threadsafe

你值得拥有

PS :PYTHON3.7

import asyncio

import threading

import random





async def fn(name,qu):
    while True:
        print(name,"ready to work")
        r=asyncio.run_coroutine_threadsafe(qu.get(),qu._loop)
        rs=r.result()
        print(name,"get work,working",rs)
        if rs=="over":
            asyncio.run_coroutine_threadsafe(qu.put("over"),qu._loop)
            print(name,"over")

        await asyncio.sleep(random.randint(1,3))
        print(name,"done,sleep")
        await asyncio.sleep(random.randint(1,3))
        print(name,"wake up")
        if rs=="over":            
            break

def t(name,qu):
    loop=asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(fn(name,qu))
loop=asyncio.new_event_loop()  
asyncio.set_event_loop(loop)  
qu=asyncio.Queue()
tl=[threading.Thread(target=t,args=(i,qu)) for i in range(5)]
for i in tl:
    i.setDaemon(True)
    i.start()


async def pt(qu):
    for i in range(20):
        await asyncio.sleep(0.8)
        await qu.put(i)
        print("work need to do",qu)
    await qu.put("over")
    await asyncio.sleep(5)

loop.run_until_complete(pt(qu))

loop.close()
豆芽 2022-09-12 10:25:01
import asyncio 
tq = asyncio.Queue()
async def forever():
    while True:
        task = await tq.get()
        await task[0](task[1])
async def funcname(param):
    a = await asyncio.sleep(1)
    await tq.put((funcname,param))
    print(a)
for i in range(100):
    tq.put((funcname,tq))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([forever() for i in range(1000)]))
loop.close()

把协程加入一个已经开始运行的循环可以这样写,把协程加入队列,再由别的协程取出执行,如果需要返回值就把返回值放入另一个队列

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文