pytest alembic用异步迁移初始化数据库

发布于 2025-02-11 22:14:01 字数 2753 浏览 3 评论 0 原文

现有帖子没有为我提供有用的答案。

我正在尝试使用pytest(db是asyncpg的postgres)来运行异步数据库测试,我想使用我的数据库初始化我的数据库Alembic迁移,以便我可以在此期间验证它们是否正常工作。

我的第一个尝试是:

@pytest.fixture(scope="session")
async def tables():
    """Initialize a database before the tests, and then tear it down again"""
    alembic_config: config.Config = config.Config('alembic.ini')
    command.upgrade(alembic_config, "head")
    yield
    command.downgrade(alembic_config, "base")

它实际上根本没有做任何事情(迁移从未应用于数据库,未创建表)。

Alembic的文档和amp; Pytest-alembic的文档说,异步迁移应通过配置您的 env> env 这样运行:

async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

asyncio.run(run_migrations_online())

但这并不能解决问题(但是它确实适用于Pytest以外的生产迁移>)。

我跌倒在一个名为 pytest-alembic 的库上,该库为此提供了一些内置的测试。

运行 pytest -test-alembic 时,我会得到以下例外:

将未来附属于其他循环

上面有几个评论 pytest-asyncio 的github存储库表明,以下固定装置可能会解决它:

@pytest.fixture(scope="session")
def event_loop() -> Generator:
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

但是它没有(同样的例外)。

接下来,我尝试运行升级手动测试,使用

async def test_migrations(alembic_runner):
    alembic_runner.migrate_up_to("revision_tag_here")

alembic_runner.migrate_up_to(“ revision_tag_here”)

venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264:在run_connection_task中 返回asyncio.run(run(Engine))

runtimeerror:asyncio.run()无法从运行事件循环

调用

,但是这是内部呼叫 by pytest-alembic ,我不会我自己调用 asyncio.run()我自己,所以我无法应用任何在线修复程序( try-catch 以检查是否有一个现有的事件循环要使用等)。我敢肯定,这与我自己的 asyncio.run() alembic env 中定义 - 或只是提出异常 - 该行实际上是从未执行的。

最后,我还尝试了 nest-asyncio.apply(),它永远悬挂。

还有一些博客文章建议使用此固定装置来初始化数据库表进行测试:

    async with engine.begin() as connection:
        await connection.run_sync(Base.metadata.create_all)

works 以创建一个数据库以进行测试,但这不是通过迁移运行的t帮助我的案子。

我觉得我已经尝试了所有的东西&访问了每个文档页面,但到目前为止我还没有运气。运行异步迁移测试肯定这不是很难吗?

如果需要任何额外的信息,我很高兴提供它。

The existing posts didn't provide a useful answer to me.

I'm trying to run asynchronous database tests using Pytest (db is Postgres with asyncpg), and I'd like to initialize my database using my Alembic migrations so that I can verify that they work properly in the meantime.

My first attempt was this:

@pytest.fixture(scope="session")
async def tables():
    """Initialize a database before the tests, and then tear it down again"""
    alembic_config: config.Config = config.Config('alembic.ini')
    command.upgrade(alembic_config, "head")
    yield
    command.downgrade(alembic_config, "base")

which didn't actually do anything at all (migrations were never applied to the database, tables not created).

Both Alembic's documentation & Pytest-Alembic's documentation say that async migrations should be run by configuring your env like this:

async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

asyncio.run(run_migrations_online())

but this doesn't resolve the issue (however it does work for production migrations outside of pytest).

I stumpled upon a library called pytest-alembic that provides some built-in tests for this.

When running pytest --test-alembic, I get the following exception:

got Future attached to a different loop

A few comments on pytest-asyncio's GitHub repository suggest that the following fixture might fix it:

@pytest.fixture(scope="session")
def event_loop() -> Generator:
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

but it doesn't (same exception remains).

Next I tried to run the upgrade test manually, using:

async def test_migrations(alembic_runner):
    alembic_runner.migrate_up_to("revision_tag_here")

which gives me

alembic_runner.migrate_up_to("revision_tag_here")

venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264: in run_connection_task
return asyncio.run(run(engine))

RuntimeError: asyncio.run() cannot be called from a running event loop

However this is an internal call by pytest-alembic, I'm not calling asyncio.run() myself, so I can't apply any of the online fixes for this (try-catching to check if there is an existing event loop to use, etc.). I'm sure this isn't related to my own asyncio.run() defined in the alembic env, because if I add a breakpoint - or just raise an exception above it - the line is actually never executed.

Lastly, I've also tried nest-asyncio.apply(), which just hangs forever.

A few more blog posts suggest to use this fixture to initialize database tables for tests:

    async with engine.begin() as connection:
        await connection.run_sync(Base.metadata.create_all)

which works for the purpose of creating a database to run tests against, but this doesn't run through the migrations so that doesn't help my case.

I feel like I've tried everything there is & visited every docs page, but I've got no luck so far. Running an async migration test surely can't be this difficult?

If any extra info is required I'm happy to provide it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

只怪假的太真实 2025-02-18 22:14:01

以下 env.py 轻松地运行了这个问题

- 这里的主要思想是可以同步运行迁移

import asyncio
from logging.config import fileConfig

from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine

config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = mymodel.Base.metadata


def run_migrations_online():
    connectable = context.config.attributes.get("connection", None)
    if connectable is None:
        connectable = AsyncEngine(
            engine_from_config(
                context.config.get_section(context.config.config_ini_section),
                prefix="sqlalchemy.",
                poolclass=pool.NullPool,
                future=True
            )
        )

    if isinstance(connectable, AsyncEngine):
        asyncio.run(run_async_migrations(connectable))
    else:
        do_run_migrations(connectable)


async def run_async_migrations(connectable):
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
    )
    with context.begin_transaction():
        context.run_migrations()


run_migrations_online()

我通过
init_db.py

from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine

__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"

cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)


async def migrate_db(conn_url: str):
    async_engine = create_async_engine(conn_url, echo=True)
    async with async_engine.begin() as conn:
        await conn.run_sync(__execute_upgrade)


def __execute_upgrade(connection):
    cfg.attributes["connection"] = connection
    command.upgrade(cfg, "head")

然后您的pytest夹具看起来像这样
conftest.py

...

@pytest_asyncio.fixture(autouse=True)
async def migrate():
    await migrate_db(conn_url)
    yield

...

注意:我不将迁移的固定装置范围范围示为测试会话,我倾向于每次测试后掉落和迁移。

I got this up and running pretty easily with the following

env.py - the main idea here is that the migration can be run synchronously

import asyncio
from logging.config import fileConfig

from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine

config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = mymodel.Base.metadata


def run_migrations_online():
    connectable = context.config.attributes.get("connection", None)
    if connectable is None:
        connectable = AsyncEngine(
            engine_from_config(
                context.config.get_section(context.config.config_ini_section),
                prefix="sqlalchemy.",
                poolclass=pool.NullPool,
                future=True
            )
        )

    if isinstance(connectable, AsyncEngine):
        asyncio.run(run_async_migrations(connectable))
    else:
        do_run_migrations(connectable)


async def run_async_migrations(connectable):
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
    )
    with context.begin_transaction():
        context.run_migrations()


run_migrations_online()

then I added a simple db init script
init_db.py

from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine

__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"

cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)


async def migrate_db(conn_url: str):
    async_engine = create_async_engine(conn_url, echo=True)
    async with async_engine.begin() as conn:
        await conn.run_sync(__execute_upgrade)


def __execute_upgrade(connection):
    cfg.attributes["connection"] = connection
    command.upgrade(cfg, "head")

then your pytest fixture can look like this
conftest.py

...

@pytest_asyncio.fixture(autouse=True)
async def migrate():
    await migrate_db(conn_url)
    yield

...

Note: I don't scope my migrate fixture to the test session, I tend to drop and migrate after each test.

日裸衫吸 2025-02-18 22:14:01

现在有一个Alembic现在提供的异步模板,请参见

您可以使用以下内容生成异步 env.py

alembic init -t async <script_directory_here>

但请继续阅读到 https://alembic.sqlalchemy.org/en/latest/latest/latest/cookbook.html#programmat-programmatic-api-pogrammat--api-api-use-connection-connection-nection-with-with-with-with-with-with-with-with-with-with-with-ashncio 建议将 run_migrations_online 更改为以下内容:

def run_migrations_online():
    """Run migrations in 'online' mode."""

    connectable = config.attributes.get("connection", None)

    if connectable is None:
        asyncio.run(run_async_migrations())
    else:
        do_run_migrations(connectable)

There's an async template provided by Alembic now, see https://alembic.sqlalchemy.org/en/latest/cookbook.html#using-asyncio-with-alembic.

You can generate the async env.py with the following:

alembic init -t async <script_directory_here>

But keep reading to https://alembic.sqlalchemy.org/en/latest/cookbook.html#programmatic-api-use-connection-sharing-with-asyncio, which advises to change run_migrations_online to the following:

def run_migrations_online():
    """Run migrations in 'online' mode."""

    connectable = config.attributes.get("connection", None)

    if connectable is None:
        asyncio.run(run_async_migrations())
    else:
        do_run_migrations(connectable)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文