返回介绍

SQLAlchemy 2.0

发布于 2024-06-23 19:15:23 字数 26983 浏览 0 评论 0 收藏 0

This article explains the new updates from SQLAlchemy 1.4 and 2.0, as well as how GINO adapts to such changes.

SQLAlchemy 2.0 will deliver many breaking API changes, and SQLAlchemy 1.4 will be the "interim" version for people to eventually upgrade their software to use SQLAlchemy 2.0.

GINO

SQLAlchemy

Dialect

Comments

1.0.x

1.3.x

Custom

Current (old-)stable.

1.1.x

1.3.x

Custom

Next old-stable.

1.2.x

1.3.x

Custom

Future old-stable (maybe).

1.4.x

1.4.x

Upstream

2.0 Interim.

2.0.x

2.0.x

Upstream

Future stable.

2.1.x

2.0.x

Upstream

Future stable iterations.

To make things easier, GINO will (luckily) also follow the same versions for the transition. That is, GINO 1.4 will be requiring SQLAlchemy 1.4, providing similar pre-1.4-compatible APIs with deprecations and options to switch to 2.0-API; GINO 2.0 needs SQLAlchemy 2.0 and provides new APIs only.

At the same time, GINO 1.1, 1.2 and 1.3 will be reserved as the old-stable versions with new features added on SQLAlchemy 1.3. And GINO post-2.0 won't match SQLAlchemy versions.

The Async Solution

Among all the exciting updates in SQLAlchemy 1.4 / 2.0, native async support is the most significant change for GINO. Simply speaking, SQLAlchemy 1.4 decided to make use of greenlet to mix asynchronous stuff into current code base, avoiding making everything async.

Let's say we have an asynchronous method to create an asyncpg connection:

import asyncpg

async def connect():
    return await asyncpg.connect("postgresql:///")

And an end-user method to use it:

async def main():
    conn = await connect()
    now = await conn.fetchval("SELECT now()")

Now instead of directly calling connect() from main(), I would like to add some additional logic - let's say, a sanity check:

async def safe_connect():
    conn = await connect()
    try:
        await conn.execute("SELECT 1")
    except Exception:
        return None
    else:
        return conn

Then the end-user should modify main() to:

 async def main():
     conn = await safe_connect()
     if conn:
         now = await conn.fetchval("SELECT now()")

OK, everything works so far, as they are all regular async code. Here's the interesting part: safe_connect() must not be an async def method. With SQLAlchemy 1.4+, we could:

 from sqlalchemy.util import await_only, greenlet_spawn

 def sync_safe_connect():
     conn = await_only(connect())
     try:
         await_only(conn.execute("SELECT 1"))
     except Exception:
         return None
     else:
         return conn

 async def safe_connect():
     return await greenlet_spawn(sync_safe_connect)

Behind the scene, greenlet_spawn() runs the given "sync" method in a greenlet, which uses await_only() to switch to the event loop and bridge the underlying async methods. As sync_safe_connect() is just a normal Python method, you can imagine how it works together with lots of other "sync" code asynchronously.

We're not going deeper into the implementation, but this is basically how SQLAlchemy 1.4 mixes asyncpg driver into its "sync" code base, and still being able to provide async APIs on top of them.

Async SQLAlchemy

Although greenlet might be the only way to practically port SQLAlchemy to the async world natively without having to maintain 2 copies of the same code, introducing implicit asynchronous to a large sync code base is still a risky move.

The sync library existed for years, with many assumptions like using threading.Lock to control concurrency. When switching to asynchronous programming, such primitives and assumptions usually need to be reviewed and probably replaced by e.g. asyncio.Lock. However, the implicit approach is so convenient that most of the blocking code just works without any changes at all, lowering the odds to review and find possible issues.

As a matter of fact, some issues can only be revealed under (heavy) concurrency. For example, threading.Lock.acquire() actually works fine in a single coroutine, but 2 concurrent coroutines acquiring the same threading.Lock may easily block the main thread forever. The same applies to data structures like queues, etc. In short, there must be nothing to block the main thread.

Lastly, having implicit asynchronous is a potential maintenance risk. Because the context switches are usually hidden behind regular sync methods, it is easy to forget such methods may lead to concurrency issues. Treating coroutines as OS threads is a good idea and it usually works, but they are fundamentally different. Extra care is always needed when trying to support both sync and async with the same code base.

However, such risks are mostly contained within SQLAlchemy itself. End-users are still using regular explicit asynchronous APIs to leverage the async DB drivers. With proper reviewing, testing and sufficient community exposure (that's GINO's part), it is still possible and reliable to have a single SQLAlchemy with 2 sets of APIs (sync + async). For sure, the APIs are not 100% identical - for example, the ORM lazy-loading won't work because there is no place for await in accessing attributes (GINO doesn't like such implicitness anyways, so yeah).

To quickly get a picture of async SQLAlchemy (Core), here's a sample from SQLAlchemy:

import asyncio

from sqlalchemy.ext.asyncio import create_async_engine

async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(meta.drop_all)
        await conn.run_sync(meta.create_all)

        await conn.execute(
            t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
        )

    async with engine.connect() as conn:

        # select a Result, which will be delivered with buffered
        # results
        result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))

        print(result.fetchall())


asyncio.run(async_main())

Auto-Commit Complication

After using PostgreSQL for a long time, I took many features for granted. The auto-commit feature is one of them. We all know that BEGIN starts a transaction and COMMIT / ROLLBACK ends it. But what is happening to SQL statements that is not wrapped in BEGIN ... COMMIT blocks?

If you do not issue a BEGIN command, then each individual statement has an implicit BEGIN and (if successful) COMMIT wrapped around it.

—PostgreSQL Documentation, 3.4. Transactions

And yes, implicit ROLLBACK if not successful. This is not directly named as an "auto-commit" feature, but PostgreSQL does enforce it. Now imagine a database whose "auto-commit" feature can be turned off - such individual statements are either executed without ACID-transactions at all (no surprise, there are databases without ACID :doge:), or the session is left with a open-ended transaction to be further committed.

PEP 249 (DB-API 2.0) - the standard for Python database drivers - was created in such background stories. The assumption it made appears to me like, the database may probably not support ACID; if it does, auto-commit is usually not supported. Because it defined only commit() and rollback() on a connection, but no begin(). So I think DB-API assumes that, when executing statements, the connection is automatically put in a transaction (if ACID is supported), and you have to call commit() to persist your changes. Closing a connection will cause pending transactions rolled back automatically.

Note that if the database supports an auto-commit feature, this (the auto-commit feature -- GINO comments) must be initially off.

—PEP 249

As the API behavior is defined, database drivers for even e.g. PostgreSQL with enforced "auto-commit" has to mimic such behavior. For example, psycopg2 will automatically emit a BEGIN to the database upon the first execution by default, so that such execution is not auto-committed. The implicit transaction boundary is a very evil thing - people constantly leaves transactions open (ever seen IDLE IN TRANSACTION?), sometimes even holding database locks and eventually causing a deadlock storm.

To work around this workaround, PEP 249 does say:

An interface method may be provided to turn it (the auto-commit feature) back on.

So for psycopg2, one could do this:

import psycopg2

conn = psycopg2.connect("postgresql:///")
conn.autocommit = True
conn.cursor().execute("SELECT now()")

Now the database correctly receives this SELECT statement only, without any implicit BEGIN surprises. But when we want to have explicit transactions, DB-API only gives us 2 options: 1) Do it manually:

conn.cursor().execute("BEGIN")
conn.cursor().execute("UPDATE ...")
conn.cursor().execute("COMMIT")

Or 2) turn auto-commit off again:

conn.autocommit = False
conn.cursor().execute("UPDATE ...")
conn.commit()

I know this is frustrating (or maybe people have accepted it), but newer database drivers like asyncpg does provide a cleaner API, by not complying to PEP 249:

import asyncpg

async def main():
    conn = await asyncpg.connect("postgresql://")

    print(await conn.fetchval("SELECT now()"))  # SELECT now();

    async with conn.transaction():              # BEGIN;
        await conn.execute("UPDATE ...")        # UPDATE ...;
                                                # COMMIT;

It's much cleaner to see what's actually happening on the wire to the database. This is also how GINO works.

SQLAlchemy for DB-API

Because SQLAlchemy is built on PEP 249 (DB-API 2.0), its API is also greatly affected by the PEP standard. For example, imagine what SQL is actually executed by this code:

import sqlalchemy as sa

e = sa.create_engine("postgresql:///", future=True)
with e.connect() as conn:
    conn.scalar(sa.text("SELECT now()"))

Only SELECT now()? No. Here's the answer:

with e.connect() as conn:                 # BEGIN; SELECT version(); ...; ROLLBACK;
    conn.scalar(sa.text("SELECT now()"))  # BEGIN; SELECT now();
                                          # ROLLBACK;

注解

We are using SQLAlchemy 2.0 API for simplification, by setting future=True using SQLAlchemy 1.4. It's way more complicated in SQLAlchemy 1.3 and I don't want to get into that.

The reason behind this is, connections have "auto-commit" turned off by default. If there is no transaction, an implicit BEGIN will be automatically executed upon the first statement. This applies to async SQLAlchemy too - even if the underlying asyncpg is not DB-API-compliant, the AsyncpgDialect in SQLAlchemy still wrapped asyncpg and simulated a compatible DB-API. Like this:

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine

async def main():
    e = create_async_engine("postgresql+asyncpg:///")
    async with e.connect() as conn:                  # BEGIN; SELECT version(); ...; ROLLBACK;
        await conn.execute(sa.text("SELECT now()"))  # BEGIN; SELECT now();
                                                     # ROLLBACK;

If you want to modify the database permanently, you have to commit() the implicit transaction explicitly:

 async def main():
     e = ...
     async with e.connect() as conn:
         await conn.execute(sa.text("UPDATE ..."))    # BEGIN; UPDATE ...;
         await conn.commit()                          # COMMIT;

Or use the explicit transaction API:

 async def main():
     e = ...
     async with e.connect() as conn:
         async with conn.begin():                     # BEGIN;
             await conn.execute(sa.text("UPDATE.."))  # UPDATE ...;
                                                      # COMMIT;

Please note that, SQLAlchemy 2.0 doesn't allow soft-nested transactions. In other words, you cannot nest 2 async with conn.begin(): blocks like this:

 async def main():
     e = ...
     async with e.connect() as conn:
         async with conn.begin():      # BEGIN;
             async with conn.begin():  # Error: a transaction is already begun
                 ...

This limitation applies to implicit transactions too, even though it's weird:

 async def main():
     e = ...
     async with e.connect() as conn:
         await conn.execute(sa.text("SELECT now()"))  # BEGIN; SELECT now();
         async with conn.begin():  # Error: a transaction is already begun
             ...

You have to explicitly close this implicit transaction in order for an explicit transaction to start successfully:

 async def main():
     e = ...
     async with e.connect() as conn:
         await conn.execute(sa.text("SELECT now()"))  # BEGIN; SELECT now();
         await conn.rollback()                        # ROLLBACK;
         async with conn.begin():                     # BEGIN;
             await conn.execute(sa.text("UPDATE.."))  # UPDATE ...;
                                                      # COMMIT;

Similar to Core, SQLAlchemy ORM follows the same principal. Grab a session, use it without begin(), and when you want to commit, commit(). Or, use an explicit transaction in a with session.begin(): block. Personally I don't like that much, but it is how SQLAlchemy manages transactions. Please follow the link above to read more.

SQLAlchemy AUTOCOMMIT

I know you already miss the WYSIWYG asyncpg and GINO API. Hang in there, let's build GINO 1.4 together with the SQLAlchemy AUTOCOMMIT feature.

To turn AUTOCOMMIT back on, we need to set the isolation_level to AUTOCOMMIT in execution_options:

 async def main():
     e = create_async_engine(
         "postgresql+asyncpg:///",
         execution_options={"isolation_level": "AUTOCOMMIT"},
     )
     async with e.connect() as conn:
         await conn.execute(sa.text("SELECT now()"))  # SELECT now();

Hooray! No more implicit BEGIN magic. We're one step closer.

注解

There is also a keyword argument:

 e = create_async_engine(
     "postgresql+asyncpg:///",
     isolation_level="AUTOCOMMIT",
 )

But this is implemented very differently than execution_options, and I don't think it's working for GINO's use case.

The next question is, how do we explicitly start a transaction? Let's try begin():

 async def main():
     e = ...
     async with e.connect() as conn:
         await conn.execute(sa.text("SELECT now()"))  # SELECT now();
         async with conn.begin():  # Error: a transaction is already begun
             ...

Wait a second ... we've seen this error before! It is the implicit transaction conflicting with the explicit transaction. But I thought we're in AUTOCOMMIT mode? Even though the isolation_level tell the driver not to send BEGIN to the database, SQLAlchemy still manages library-level transaction objects. We have to close the virtual implicit transaction before starting an explicit transaction:

 async def main():
     e = ...
     async with e.connect() as conn:
         await conn.execute(sa.text("SELECT now()"))  # SELECT now();
         await conn.rollback()                        # no-op
         async with conn.begin():                     # no-op
             await conn.execute(sa.text("UPDATE.."))  # UPDATE ...;

Well, not quite what we expected. With AUTOCOMMIT set, all of begin(), commit() and rollback() become no-ops.

Similar to the answers in psycopg2, we have 2 options here too: 1) manually execute transaction-control SQLs, or 2) turn off AUTOCOMMIT temporarily. As we want to be more compatible with SQLAlchemy, let's try 2):

 async def main():
     e = ...
     async with e.connect() as conn:
         await conn.execute(sa.text("SELECT now()"))  # SELECT now();
         await conn.rollback()                        # no-op
         async with conn.execution_options(
             isolation_level="READ COMMITTED"
         ).begin():                                   # BEGIN;
             await conn.execute(sa.text("UPDATE.."))  # UPDATE ...;
                                                      # COMMIT;

It's working! According to SQLAlchemy docs, execution_options() creates a shallow copy of the connection, and apply new values only to the copy. So the original connection should still be in AUTOCOMMIT, right? Well...

 async def main():
     e = ...
     async with e.connect() as conn:
         ...
         async with conn.execution_options(
             isolation_level="READ COMMITTED"
         ).begin():                                   # BEGIN;
             await conn.execute(sa.text("UPDATE.."))  # UPDATE ...;
                                                      # COMMIT;

         await conn.execute(sa.text("SELECT now()"))  # BEGIN; SELECT now();
                                                      # ROLLBACK;

Unfortunately, the implicit transaction is haunting us again. This is because both the original connection and its shallow copy points to the same "DB-API" connection (in this case, a SQLAlchemy wrapper of asyncpg connection), and setting isolation_level modifies the value on "DB-API" connection.

Returning a SQLAlchemy connection back to the pool resets the isolation_level to its default value, and acquiring the same connection again will initialize the isolation_level with values from execution_options of the engine. But if we want to keep using the same connection without returning, we have to manually overwrite its isolation_level again:

 async def main():
     e = ...
     async with e.connect() as conn:
         ...
         async with conn.execution_options(
             isolation_level="READ COMMITTED"
         ).begin():                                   # BEGIN;
             await conn.execute(sa.text("UPDATE.."))  # UPDATE ...;
                                                      # COMMIT;

         conn.execution_options(isolation_level="AUTOCOMMIT")
         await conn.execute(sa.text("SELECT now()"))  # SELECT now();

Eventually we made it! ????

Encapsulating all such logic, GINO 1.4 could then provide decent WYSIWYG APIs again:

import gino

async def main():
    engine = await gino.create_engine("postgresql:///")
    async with engine.acquire() as conn:
        await conn.scalar("SELECT now()")    # SELECT now();

        async with conn.transaction():       # BEGIN;
            await conn.status("UPDATE ...")  # UPDATE ...;
                                             # COMMIT;

提示

Now I feel that "implementing" auto-commit feature is more like restoring to the original database behavior, and having auto-commit turned off by default should be considered as a new feature called "auto-begin" or "implicit transaction". And it's a bad design introduced in early PEP 249, affecting SQLAlchemy and the ecosystem.

Isolation Levels

By far, we only used 2 isolation_level values:

  • AUTOCOMMIT

  • READ COMMITTED

AUTOCOMMIT is not a valid PostgreSQL isolation level. It's only recognized and consumed by the SQLAlchemy asyncpg dialect to bypass the "auto-begin" simulation.

READ COMMITTED is the default PostgreSQL isolation level. You can verify this by executing a SQL directly in a transaction:

# BEGIN;
BEGIN

# SHOW TRANSACTION ISOLATION LEVEL;
 transaction_isolation
-----------------------
 read committed
(1 row)

# ROLLBACK;
ROLLBACK

To start a transaction in a different isolation level, you may:

 # BEGIN ISOLATION LEVEL SERIALIZABLE;
 BEGIN

 # SHOW TRANSACTION ISOLATION LEVEL;
  transaction_isolation
 -----------------------
  serializable
 (1 row)

 # ROLLBACK;
 ROLLBACK

As mentioned earlier, all PostgreSQL statements are executed in a transaction. If no explicit BEGIN in place, an implicit transaction is used. So this SQL also works individually:

# SHOW TRANSACTION ISOLATION LEVEL;
 transaction_isolation
-----------------------
 read committed
(1 row)

But how could we modify the isolation level of such implicit transactions? The answer is to set isolation level session-wise. This affects all subsequent transactions, including both implicit and explicit ones, except for explicit transactions with explicit isolation levels:

 # SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;
 SET

 # SHOW TRANSACTION ISOLATION LEVEL;
  transaction_isolation
 -----------------------
  serializable
 (1 row)

 # BEGIN;
 BEGIN

 # SHOW TRANSACTION ISOLATION LEVEL;
  transaction_isolation
 -----------------------
  serializable
 (1 row)

 # ROLLBACK;
 ROLLBACK

 # BEGIN ISOLATION LEVEL READ COMMITTED;
 BEGIN

 # SHOW TRANSACTION ISOLATION LEVEL;
  transaction_isolation
 -----------------------
  read committed
 (1 row)

 # ROLLBACK;
 ROLLBACK

Then let's see how SQLAlchemy with asyncpg solves this problem:

 async def main():
     e = create_async_engine(
         "postgresql+asyncpg:///",
         execution_options={"isolation_level": "SERIALIZABLE"},
     )
     async with e.connect() as conn:
         async with conn.begin():  # BEGIN ISOLATION LEVEL SERIALIZABLE;
             await conn.execute(sa.text("UPDATE ..."))  # UPDATE ...;
                                                        # COMMIT;

Under the neath, SQLAlchemy is leveraging asyncpg's Connection.transaction(isolation="...") to set isolation level per transaction. In GINO, we just need to store the user-defined isolation level, and set before transactions.

But there are 2 issues:

  • User-defined isolation level is not applied in PostgreSQL implicit transactions (a.k.a. auto-commit statements), because no one SET SESSION.

  • asyncpg has a bug that Connection.transaction(isolation="read_committed") always emit BEGIN without explicit isolation level, regardless of the actual default isolation level.

The asyncpg bug should be fixed from upstream, but we could leverage a session-wide isolation level setter from base SQLAlchemy dialect for PostgreSQL:

 import sqlalchemy as sa
 from sqlalchemy import event
 from sqlalchemy.dialects.postgresql.base import PGDialect
 from sqlalchemy.ext.asyncio import create_async_engine


 async def main():
     e = create_async_engine(
         "postgresql+asyncpg:///",
         execution_options={"isolation_level": "AUTOCOMMIT"},
     )

     def set_isolation_level(dbapi_conn, record):
         PGDialect.set_isolation_level(
             e.sync_engine.dialect,
             dbapi_conn,
             "SERIALIZABLE",
         )

     event.listen(e.sync_engine, "connect", set_isolation_level)

     async with e.connect() as conn:
         print(await conn.scalar(sa.text("SHOW TRANSACTION ISOLATION LEVEL")))
         # Outputs: serializable

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文