Python 高级 asyncio 测试
在我 上一篇文章中 ,我显示了 pytest 的 Fixture 系统和插入式基础架构是如何帮你编写更加干净优秀的测试的。Fixture 允许你为每个测试用例创建一个干净的事件循环实例。而插入式系统允许你编写实际上市 asyncio 协程的测试函数。在我写那篇文章时, Tin Tvrtkovic 创建了插入式 pytest-asyncio 。
总之,它让你可以这样:
import asyncio
import time
import pytest
@pytest.mark.asyncio
def test_coro(event_loop):
before = time.monotonic()
await asyncio.sleep(0.1, loop=event_loop)
after = time.monotonic()
assert after - before >= 0.1
来取代这样:
import asyncio
import time
def test_coro():
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
before = time.monotonic()
loop.run_until_complete(asyncio.sleep(0.1, loop=loop))
after = time.monotonic()
assert after - before >= 0.1
finally:
loop.close()
因此,使用 pytest-asyncio 显然改善你的测试 (当然,这个插件还能做更多东西!)。
在我努力做 aiomas 时,一些无法简单涵盖的额外需求出现了。 aiomas 基本上做的是在 asyncio 传输周围增加三个抽象层:
- channel 层允许你以一种请求-应答方式发送 JSON 或者 MsgPack 编码消息。这一层使用了与不同种类的传输一起工作的自定义协议:TCP 套接字,Unix 域套接字和名为 本地队列 的自定义传输。
- RPC 层在 channel 层之上创建了一个远程过程调用系统。
- agent 层(为多代理系统)隐藏了更多的网络相关的东西,并基本上让你编写那些通过网络连接调用其他类方法的类。
这里是 channel 层如何工作的一个简单例子:
import aiomas
async def handle_client(channel):
"""Handle a client connection."""
req = await channel.recv()
print(req.content)
await req.reply('cya')
await channel.close()
async def client():
"""Client coroutine: Send a greeting to the server and wait for a
reply."""
channel = await aiomas.channel.open_connection(('localhost', 5555))
rep = await channel.send('ohai')
print(rep)
await channel.close()
server = aiomas.run(aiomas.channel.start_server(
('localhost', 5555), handle_client))
aiomas.run(client())
server.close()
aiomas.run(server.wait_closed())
对于我们的测试的要求
所以,考虑到这一点,对于我的测试,我有以下要求:
- 对于每个测试,我需要一个干净的事件循环实例。
这可以使用 pytest-asyncio 提供的
event_loop
来解决。 - 每一个测试都应该使用一个可用的传输来运行 (TCP socket, Unix domain socket, …).
这在理论上可以使用
pytest.mark.parametrize()
装饰器解决 (稍后我们会看到,在我的例子中并不是这样的)。 - 每一个测试需要一个客户端协程。理想情况下,这将是测试本身。
pytest-asyncio 的
pytest.mark.asyncio
装饰器解决了这个问题。 - 每个测试需要一个带有与客户端连接相对应的自定义回调的服务器。不管测试输出是什么,都必须彻底关闭服务器
看起来一个 fixture 可以做到这点,但每个服务器都需要一个特定测试回调来处理客户端连接。这使得它困难得多。
- 如果一个测试失败了,我不希望看到任何“address already in use”错误。
pytest-asyncio 的
unused_tcp_port
fixture 可以一用。 - 我不想一直使用
loop.run_until_complete()
。再次,
pytest.mark.asyncio
装饰器解决了这个问题。
总结有待解决的问题:每个测试都需要至少两个 fixture(一个用于事件循环,另一个用于地址类型),但我想将它们结合成一个单一的 fixture。为建立服务器创建一个 fixture 也是不错的,但如何才能做到这一点呢?
第一个种方法
我们能做的第一件事是将循环和地址类型都放在一个 fixture 中。我们将称其为 ctx ( 测试上下文(test context) 的缩写)。使用 fixture 参数,也可以容易地为每个地址类型创建一个 fixture 实例:
import tempfile
import py
import pytest
class Context:
def __init__(self, loop, addr):
self.loop = loop
self.addr = addr
@pytest.fixture(params=['tcp', 'unix'])
def ctx(request, event_loop, unused_tcp_port, short_tmpdir):
"""Generate tests with TCP sockets and Unix domain sockets."""
addr_type = request.param
if addr_type == 'tcp':
addr = ('127.0.0.1', unused_tcp_port)
elif addr_type == 'unix':
addr = short_tmpdir.join('sock').strpath
else:
raise RuntimeError('Unknown addr type: %s' % addr_type)
ctx = Context(event_loop, addr)
return ctx
@pytest.yield_fixture()
def short_tmpdir():
"""Generate a short temp. dir for Unix domain sockets. The paths
provided by ptest's tmpdir fixture are too long on some platforms."""
with tempfile.TemporaryDirectory() as tdir:
yield py.path.local(tdir)
这让我们这样编写我们的测试:
import aiomas
@pytest.mark.asyncio
async def test_channel(ctx):
results = []
async def handle_client(channel):
req = await channel.recv()
results.append(req.content)
await req.reply('cya')
await channel.close()
server = await aiomas.channel.start_server(ctx.addr, handle_client)
try:
channel = await aiomas.channel.open_connection(ctx.addr)
rep = await channel.send('ohai')
results.append(rep)
await channel.close()
finally:
server.close()
await server.wait_closed()
assert results == ['ohai', 'cya']
This works already very nicely and every test using the 这已经工作良好,而且使用 ctx
fixture 的每个测试都为每个地址类型运行一次。
然而,有两个问题仍然存在:
- 我们的
ctx
fixture 总是需要一个未使用的 TCP 端口以及一个临时目录 —— 虽然在每种情况下,我们只需要其中之一。 - 建立服务器 (和关闭它) 也涉及一些代码,这些代码对于每个测试都是一样的,因此应该被移到一个 fixture 中。然而,一个
server
fixture 并不直接工作,因为每个服务器需要一个指定测试的回调,正如你在我们创建服务器的那一行(server = await ...
) 可以看到的。但没有server
fixture,对此我们就无法拆除……
让我们看看我们如何能够解决这些问题。
第二种方法
第一个问题可以通过我们的 fixture 接收的 request 对象的 getfuncargvalue()
方法来解决。使用这个方法,我们可以手工调用一个 fixture 函数:
@pytest.fixture(params=['tcp', 'unix'])
def ctx(request, event_loop):
"""Generate tests with TCP sockets and Unix domain sockets."""
addr_type = request.param
if addr_type == 'tcp':
port = request.getfuncargvalue('unused_tcp_port')
addr = ('127.0.0.1', port)
elif addr_type == 'unix':
tmpdir = request.getfuncargvalue('short_tmpdir')
addr = tmpdir.join('sock').strpath
else:
raise RuntimeError('Unknown addr type: %s' % addr_type)
ctx = Context(event_loop, addr)
return ctx
要解决第二个问题,我们可以扩展传递给每个测试的 Context
类。我们添加一个方法 Context.start_server(client_handler)
,在我们的测试中,我们可以调用这个方法。我们还添加了一个 finalize/teardown 部分到我们的 ctx
fixture 中,一旦完成了,它将关闭服务器。而我们还需要创建一些快捷功能:
import asyncio
import tempfile
import py
import pytest
class Context:
def __init__(self, loop, addr):
self.loop = loop
self.addr = addr
self.server = None
async def connect(self, **kwargs):
"""Create and return a connection to "self.addr"."""
return (await aiomas.channel.open_connection(
self.addr, loop=self.loop, **kwargs))
async def start_server(self, handle_client, **kwargs):
"""Start a server with the callback *handle_client* listening on
"self.addr"."""
self.server = await aiomas.channel.start_server(
self.addr, handle_client, loop=self.loop, **kwargs)
async def start_server_and_connect(self, handle_client,
server_kwargs=None,
client_kwargs=None):
"""Shortcut for::
await ctx.start_server(...)
channel = await ctx.connect()"
"""
if server_kwargs is None:
server_kwargs = {}
if client_kwargs is None:
client_kwargs = {}
await self.start_server(handle_client, **server_kwargs)
return (await self.connect(**client_kwargs))
async def close_server(self):
"""Close the server."""
if self.server is not None:
server, self.server = self.server, None
server.close()
await server.wait_closed()
@pytest.yield_fixture(params=['tcp', 'unix'])
def ctx(request, event_loop):
"""Generate tests with TCP sockets and Unix domain sockets."""
addr_type = request.param
if addr_type == 'tcp':
port = request.getfuncargvalue('unused_tcp_port')
addr = ('127.0.0.1', port)
elif addr_type == 'unix':
tmpdir = request.getfuncargvalue('short_tmpdir')
addr = tmpdir.join('sock').strpath
else:
raise RuntimeError('Unknown addr type: %s' % addr_type)
ctx = Context(event_loop, addr)
yield ctx
# Shutdown the server and wait for all pending tasks to finish:
aiomas.run(ctx.close_server())
aiomas.run(asyncio.gather(*asyncio.Task.all_tasks(event_loop),
return_exceptions=True))
使用这个额外的功能,我们的测试用例变得短得多,容易读得多,并且更加可靠:
import aiomas
@pytest.mark.asyncio
async def test_channel(ctx):
results = []
async def handle_client(channel):
req = await channel.recv()
results.append(req.content)
await req.reply('cya')
await channel.close()
channel = await ctx.start_server_and_connect(handle_client)
rep = await channel.send('ohai')
results.append(rep)
await channel.close()
assert results == ['ohai', 'cya']
ctx
fixture (和相关的 Context
类) 确实不是我写过的最短的 fixture,但它帮助我从我的测试中移除了约 200 行的样板文件代码(除了让它们更加可读和可维护)。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: 使用 pcase 进行模式匹配
下一篇: 惯用 Python:布尔表达式
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论