如何使用pytest嘲笑Asyncpg执行者和获取方法?

发布于 2025-02-10 07:58:48 字数 966 浏览 1 评论 0原文

我正在使用pytest和asyncpg加载并将项目保存到数据库中。如何测试以下两个函数?

import asyncpg
config = {host: "localhost", port: 5432, database: "postgres"}


async def read_data():
    async with asyncpg.create_pool(**config) as pool:
        async with pool.acquire() as conn:
            feeds = await conn.fetch(
                "select feed_id, url, etag, last_modified from feeds WHERE enabled=TRUE"
            )
            return feeds


async def write_data(items):
    async with asyncpg.create_pool(**config) as pool:
        async with pool.acquire() as conn:
            query = '''
                INSERT INTO
                    feed_items (feed_item_id, pubdate, link, guid, title, summary, content, author, feed_id) 
                VALUES
                    (
                        $1, $2, $3, $4, $5, $6, $7, $8, $9
                    )
                    ON CONFLICT DO NOTHING
            '''
            await conn.executemany(query, items)

I am using pytest and asyncpg to load and save items to the database. How do I test the 2 functions below?

import asyncpg
config = {host: "localhost", port: 5432, database: "postgres"}


async def read_data():
    async with asyncpg.create_pool(**config) as pool:
        async with pool.acquire() as conn:
            feeds = await conn.fetch(
                "select feed_id, url, etag, last_modified from feeds WHERE enabled=TRUE"
            )
            return feeds


async def write_data(items):
    async with asyncpg.create_pool(**config) as pool:
        async with pool.acquire() as conn:
            query = '''
                INSERT INTO
                    feed_items (feed_item_id, pubdate, link, guid, title, summary, content, author, feed_id) 
                VALUES
                    (
                        $1, $2, $3, $4, $5, $6, $7, $8, $9
                    )
                    ON CONFLICT DO NOTHING
            '''
            await conn.executemany(query, items)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

美人迟暮 2025-02-17 07:58:48

我们可以模拟数据库调用,但是这样做时没有太多测试。您的功能只需读取/编写一些数据,逻辑或条件不多。我的建议是将邮政实例旋转在码头容器中,并在您的测试旁边进行测试。

话虽如此,在下面,您可以找到如何模拟数据库调用并测试一些基本功能。您将需要安装pytest-assyncio才能使其工作。

from unittest.mock import patch

import pytest
import pytest_asyncio

from src.get_events import read_data, write_data


# fixture which mocks database behavior
# take note that you have to mock __aenter__ on the calls as you are
# using with statements in your code
@pytest_asyncio.fixture
async def db_conn():
    with patch("src.get_events.asyncpg") as mock_pool:
        mock_pool.configure_mock(
            **{
                "create_pool.return_value.__aenter__.return_value": mock_pool,
                "acquire.return_value.__aenter__.return_value": mock_pool
            }
        )

        yield mock_pool


@pytest.mark.asyncio
async def test_read_data(db_conn):
    expected = [1, 2, 3]
    expected_query = "select feed_id, url, etag, last_modified from feeds WHERE enabled=TRUE"

    async def fetch():
        return expected
    
    # async function calls need to return an awaitable
    db_conn.fetch.return_value = fetch()

    feeds = await read_data()
    
    assert feeds == expected
    # assert function was called with the query we expect
    db_conn.fetch.assert_called_once_with(expected_query)


@pytest.mark.asyncio
async def test_write_data(db_conn):
    async def sink(*args, **kwargs):
        return

    items = [3, 4]
    db_conn.executemany.return_value = sink()
    await write_data(items)

    db_conn.executemany.assert_called_once()
    kall = db_conn.executemany.call_args
    # confirm that the items passed in to the function are the same ones
    # being sent to the database
    assert items in kall.args

=============================================================== test session starts ===============================================================
platform darwin -- Python 3.8.9, pytest-7.0.1, pluggy-1.0.0
cachedir: .pytest_cache
rootdir: **
asyncio: mode=strict
collected 2 items                                                                                                                                 

tests/test_script.py::test_read_data PASSED
tests/test_script.py::test_write_data PASSED

================================================================ 2 passed in 0.03s ================================================================

We can mock the database calls, however there isn't much to test when doing that. Your functions simply read/write some data, there isn't much logic or conditionals to verify. My recommendation would be to spin up a Postgres instance in a Docker container alongside your tests and test by hitting that.

With that being said, below you can find how to mock your database calls and test some basic functionality. You will need to install pytest-asyncio to get this to work.

from unittest.mock import patch

import pytest
import pytest_asyncio

from src.get_events import read_data, write_data


# fixture which mocks database behavior
# take note that you have to mock __aenter__ on the calls as you are
# using with statements in your code
@pytest_asyncio.fixture
async def db_conn():
    with patch("src.get_events.asyncpg") as mock_pool:
        mock_pool.configure_mock(
            **{
                "create_pool.return_value.__aenter__.return_value": mock_pool,
                "acquire.return_value.__aenter__.return_value": mock_pool
            }
        )

        yield mock_pool


@pytest.mark.asyncio
async def test_read_data(db_conn):
    expected = [1, 2, 3]
    expected_query = "select feed_id, url, etag, last_modified from feeds WHERE enabled=TRUE"

    async def fetch():
        return expected
    
    # async function calls need to return an awaitable
    db_conn.fetch.return_value = fetch()

    feeds = await read_data()
    
    assert feeds == expected
    # assert function was called with the query we expect
    db_conn.fetch.assert_called_once_with(expected_query)


@pytest.mark.asyncio
async def test_write_data(db_conn):
    async def sink(*args, **kwargs):
        return

    items = [3, 4]
    db_conn.executemany.return_value = sink()
    await write_data(items)

    db_conn.executemany.assert_called_once()
    kall = db_conn.executemany.call_args
    # confirm that the items passed in to the function are the same ones
    # being sent to the database
    assert items in kall.args

=============================================================== test session starts ===============================================================
platform darwin -- Python 3.8.9, pytest-7.0.1, pluggy-1.0.0
cachedir: .pytest_cache
rootdir: **
asyncio: mode=strict
collected 2 items                                                                                                                                 

tests/test_script.py::test_read_data PASSED
tests/test_script.py::test_write_data PASSED

================================================================ 2 passed in 0.03s ================================================================
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文