- 欢迎来到 GINO 的文档
- 上手教程
- 进阶用法
- 原理说明
- 参考手册
- API 参考
- gino.bakery module
- gino.api module
- gino.declarative module
- gino.schema module
- gino.crud module
- gino.json_support module
- gino.engine module
- gino.loader module
- gino.dialects.asyncpg module
- gino.dialects.base module
- gino.transaction module
- gino.dialects package
- gino.dialects.aiomysql module
- gino.ext package
- gino.aiocontextvars module
- gino.exceptions module
- gino.strategies module
- 扩展
- 版本历史
- API 参考
SQLAlchemy 2.0
This article explains the new updates from SQLAlchemy 1.4 and 2.0, as well as how GINO adapts to such changes.
SQLAlchemy 2.0 will deliver many breaking API changes, and SQLAlchemy 1.4 will be the "interim" version for people to eventually upgrade their software to use SQLAlchemy 2.0.
GINO | SQLAlchemy | Dialect | Comments |
---|---|---|---|
1.0.x | 1.3.x | Custom | Current (old-)stable. |
1.1.x | 1.3.x | Custom | Next old-stable. |
1.2.x | 1.3.x | Custom | Future old-stable (maybe). |
1.4.x | 1.4.x | Upstream | 2.0 Interim. |
2.0.x | 2.0.x | Upstream | Future stable. |
2.1.x | 2.0.x | Upstream | Future stable iterations. |
To make things easier, GINO will (luckily) also follow the same versions for the transition. That is, GINO 1.4 will be requiring SQLAlchemy 1.4, providing similar pre-1.4-compatible APIs with deprecations and options to switch to 2.0-API; GINO 2.0 needs SQLAlchemy 2.0 and provides new APIs only.
At the same time, GINO 1.1, 1.2 and 1.3 will be reserved as the old-stable versions with new features added on SQLAlchemy 1.3. And GINO post-2.0 won't match SQLAlchemy versions.
The Async Solution
Among all the exciting updates in SQLAlchemy 1.4 / 2.0, native async support is the most significant change for GINO. Simply speaking, SQLAlchemy 1.4 decided to make use of greenlet to mix asynchronous stuff into current code base, avoiding making everything async.
Let's say we have an asynchronous method to create an asyncpg connection:
import asyncpg async def connect(): return await asyncpg.connect("postgresql:///")
And an end-user method to use it:
async def main(): conn = await connect() now = await conn.fetchval("SELECT now()")
Now instead of directly calling connect()
from main()
, I would like to add some additional logic - let's say, a sanity check:
async def safe_connect(): conn = await connect() try: await conn.execute("SELECT 1") except Exception: return None else: return conn
Then the end-user should modify main()
to:
async def main(): conn = await safe_connect() if conn: now = await conn.fetchval("SELECT now()")
OK, everything works so far, as they are all regular async code. Here's the interesting part: safe_connect()
must not be an async def
method. With SQLAlchemy 1.4+, we could:
from sqlalchemy.util import await_only, greenlet_spawn def sync_safe_connect(): conn = await_only(connect()) try: await_only(conn.execute("SELECT 1")) except Exception: return None else: return conn async def safe_connect(): return await greenlet_spawn(sync_safe_connect)
Behind the scene, greenlet_spawn()
runs the given "sync" method in a greenlet, which uses await_only()
to switch to the event loop and bridge the underlying async methods. As sync_safe_connect()
is just a normal Python method, you can imagine how it works together with lots of other "sync" code asynchronously.
We're not going deeper into the implementation, but this is basically how SQLAlchemy 1.4 mixes asyncpg driver into its "sync" code base, and still being able to provide async APIs on top of them.
Async SQLAlchemy
Although greenlet might be the only way to practically port SQLAlchemy to the async world natively without having to maintain 2 copies of the same code, introducing implicit asynchronous to a large sync code base is still a risky move.
The sync library existed for years, with many assumptions like using threading.Lock
to control concurrency. When switching to asynchronous programming, such primitives and assumptions usually need to be reviewed and probably replaced by e.g. asyncio.Lock
. However, the implicit approach is so convenient that most of the blocking code just works without any changes at all, lowering the odds to review and find possible issues.
As a matter of fact, some issues can only be revealed under (heavy) concurrency. For example, threading.Lock.acquire()
actually works fine in a single coroutine, but 2 concurrent coroutines acquiring the same threading.Lock
may easily block the main thread forever. The same applies to data structures like queues, etc. In short, there must be nothing to block the main thread.
Lastly, having implicit asynchronous is a potential maintenance risk. Because the context switches are usually hidden behind regular sync methods, it is easy to forget such methods may lead to concurrency issues. Treating coroutines as OS threads is a good idea and it usually works, but they are fundamentally different. Extra care is always needed when trying to support both sync and async with the same code base.
However, such risks are mostly contained within SQLAlchemy itself. End-users are still using regular explicit asynchronous APIs to leverage the async DB drivers. With proper reviewing, testing and sufficient community exposure (that's GINO's part), it is still possible and reliable to have a single SQLAlchemy with 2 sets of APIs (sync + async). For sure, the APIs are not 100% identical - for example, the ORM lazy-loading won't work because there is no place for await
in accessing attributes (GINO doesn't like such implicitness anyways, so yeah).
To quickly get a picture of async SQLAlchemy (Core), here's a sample from SQLAlchemy:
import asyncio from sqlalchemy.ext.asyncio import create_async_engine async def async_main(): engine = create_async_engine( "postgresql+asyncpg://scott:tiger@localhost/test", echo=True, ) async with engine.begin() as conn: await conn.run_sync(meta.drop_all) await conn.run_sync(meta.create_all) await conn.execute( t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}] ) async with engine.connect() as conn: # select a Result, which will be delivered with buffered # results result = await conn.execute(select(t1).where(t1.c.name == "some name 1")) print(result.fetchall()) asyncio.run(async_main())
Auto-Commit Complication
After using PostgreSQL for a long time, I took many features for granted. The auto-commit feature is one of them. We all know that BEGIN
starts a transaction and COMMIT
/ ROLLBACK
ends it. But what is happening to SQL statements that is not wrapped in BEGIN ... COMMIT
blocks?
If you do not issue a
BEGIN
command, then each individual statement has an implicitBEGIN
and (if successful)COMMIT
wrapped around it.—PostgreSQL Documentation, 3.4. Transactions
And yes, implicit ROLLBACK
if not successful. This is not directly named as an "auto-commit" feature, but PostgreSQL does enforce it. Now imagine a database whose "auto-commit" feature can be turned off - such individual statements are either executed without ACID-transactions at all (no surprise, there are databases without ACID :doge:), or the session is left with a open-ended transaction to be further committed.
PEP 249 (DB-API 2.0) - the standard for Python database drivers - was created in such background stories. The assumption it made appears to me like, the database may probably not support ACID; if it does, auto-commit is usually not supported. Because it defined only commit()
and rollback()
on a connection, but no begin()
. So I think DB-API assumes that, when executing statements, the connection is automatically put in a transaction (if ACID is supported), and you have to call commit()
to persist your changes. Closing a connection will cause pending transactions rolled back automatically.
Note that if the database supports an auto-commit feature, this (the auto-commit feature -- GINO comments) must be initially off.
—PEP 249
As the API behavior is defined, database drivers for even e.g. PostgreSQL with enforced "auto-commit" has to mimic such behavior. For example, psycopg2 will automatically emit a BEGIN
to the database upon the first execution by default, so that such execution is not auto-committed. The implicit transaction boundary is a very evil thing - people constantly leaves transactions open (ever seen IDLE IN TRANSACTION
?), sometimes even holding database locks and eventually causing a deadlock storm.
To work around this workaround, PEP 249 does say:
An interface method may be provided to turn it (the auto-commit feature) back on.
So for psycopg2, one could do this:
import psycopg2 conn = psycopg2.connect("postgresql:///") conn.autocommit = True conn.cursor().execute("SELECT now()")
Now the database correctly receives this SELECT
statement only, without any implicit BEGIN
surprises. But when we want to have explicit transactions, DB-API only gives us 2 options: 1) Do it manually:
conn.cursor().execute("BEGIN") conn.cursor().execute("UPDATE ...") conn.cursor().execute("COMMIT")
Or 2) turn auto-commit off again:
conn.autocommit = False conn.cursor().execute("UPDATE ...") conn.commit()
I know this is frustrating (or maybe people have accepted it), but newer database drivers like asyncpg does provide a cleaner API, by not complying to PEP 249:
import asyncpg async def main(): conn = await asyncpg.connect("postgresql://") print(await conn.fetchval("SELECT now()")) # SELECT now(); async with conn.transaction(): # BEGIN; await conn.execute("UPDATE ...") # UPDATE ...; # COMMIT;
It's much cleaner to see what's actually happening on the wire to the database. This is also how GINO works.
SQLAlchemy for DB-API
Because SQLAlchemy is built on PEP 249 (DB-API 2.0), its API is also greatly affected by the PEP standard. For example, imagine what SQL is actually executed by this code:
import sqlalchemy as sa e = sa.create_engine("postgresql:///", future=True) with e.connect() as conn: conn.scalar(sa.text("SELECT now()"))
Only SELECT now()
? No. Here's the answer:
with e.connect() as conn: # BEGIN; SELECT version(); ...; ROLLBACK; conn.scalar(sa.text("SELECT now()")) # BEGIN; SELECT now(); # ROLLBACK;
注解
We are using SQLAlchemy 2.0 API for simplification, by setting future=True
using SQLAlchemy 1.4. It's way more complicated in SQLAlchemy 1.3 and I don't want to get into that.
The reason behind this is, connections have "auto-commit" turned off by default. If there is no transaction, an implicit BEGIN
will be automatically executed upon the first statement. This applies to async SQLAlchemy too - even if the underlying asyncpg is not DB-API-compliant, the AsyncpgDialect in SQLAlchemy still wrapped asyncpg and simulated a compatible DB-API. Like this:
import sqlalchemy as sa from sqlalchemy.ext.asyncio import create_async_engine async def main(): e = create_async_engine("postgresql+asyncpg:///") async with e.connect() as conn: # BEGIN; SELECT version(); ...; ROLLBACK; await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now(); # ROLLBACK;
If you want to modify the database permanently, you have to commit()
the implicit transaction explicitly:
async def main(): e = ... async with e.connect() as conn: await conn.execute(sa.text("UPDATE ...")) # BEGIN; UPDATE ...; await conn.commit() # COMMIT;
Or use the explicit transaction API:
async def main(): e = ... async with e.connect() as conn: async with conn.begin(): # BEGIN; await conn.execute(sa.text("UPDATE..")) # UPDATE ...; # COMMIT;
Please note that, SQLAlchemy 2.0 doesn't allow soft-nested transactions. In other words, you cannot nest 2 async with conn.begin():
blocks like this:
async def main(): e = ... async with e.connect() as conn: async with conn.begin(): # BEGIN; async with conn.begin(): # Error: a transaction is already begun ...
This limitation applies to implicit transactions too, even though it's weird:
async def main(): e = ... async with e.connect() as conn: await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now(); async with conn.begin(): # Error: a transaction is already begun ...
You have to explicitly close this implicit transaction in order for an explicit transaction to start successfully:
async def main(): e = ... async with e.connect() as conn: await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now(); await conn.rollback() # ROLLBACK; async with conn.begin(): # BEGIN; await conn.execute(sa.text("UPDATE..")) # UPDATE ...; # COMMIT;
Similar to Core, SQLAlchemy ORM follows the same principal. Grab a session, use it without begin()
, and when you want to commit, commit()
. Or, use an explicit transaction in a with session.begin():
block. Personally I don't like that much, but it is how SQLAlchemy manages transactions. Please follow the link above to read more.
SQLAlchemy AUTOCOMMIT
I know you already miss the WYSIWYG asyncpg and GINO API. Hang in there, let's build GINO 1.4 together with the SQLAlchemy AUTOCOMMIT feature.
To turn AUTOCOMMIT back on, we need to set the isolation_level
to AUTOCOMMIT
in execution_options
:
async def main(): e = create_async_engine( "postgresql+asyncpg:///", execution_options={"isolation_level": "AUTOCOMMIT"}, ) async with e.connect() as conn: await conn.execute(sa.text("SELECT now()")) # SELECT now();
Hooray! No more implicit BEGIN
magic. We're one step closer.
注解
There is also a keyword argument:
e = create_async_engine( "postgresql+asyncpg:///", isolation_level="AUTOCOMMIT", )
But this is implemented very differently than execution_options
, and I don't think it's working for GINO's use case.
The next question is, how do we explicitly start a transaction? Let's try begin()
:
async def main(): e = ... async with e.connect() as conn: await conn.execute(sa.text("SELECT now()")) # SELECT now(); async with conn.begin(): # Error: a transaction is already begun ...
Wait a second ... we've seen this error before! It is the implicit transaction conflicting with the explicit transaction. But I thought we're in AUTOCOMMIT mode? Even though the isolation_level
tell the driver not to send BEGIN
to the database, SQLAlchemy still manages library-level transaction objects. We have to close the virtual implicit transaction before starting an explicit transaction:
async def main(): e = ... async with e.connect() as conn: await conn.execute(sa.text("SELECT now()")) # SELECT now(); await conn.rollback() # no-op async with conn.begin(): # no-op await conn.execute(sa.text("UPDATE..")) # UPDATE ...;
Well, not quite what we expected. With AUTOCOMMIT set, all of begin()
, commit()
and rollback()
become no-ops.
Similar to the answers in psycopg2, we have 2 options here too: 1) manually execute transaction-control SQLs, or 2) turn off AUTOCOMMIT temporarily. As we want to be more compatible with SQLAlchemy, let's try 2):
async def main(): e = ... async with e.connect() as conn: await conn.execute(sa.text("SELECT now()")) # SELECT now(); await conn.rollback() # no-op async with conn.execution_options( isolation_level="READ COMMITTED" ).begin(): # BEGIN; await conn.execute(sa.text("UPDATE..")) # UPDATE ...; # COMMIT;
It's working! According to SQLAlchemy docs, execution_options()
creates a shallow copy of the connection, and apply new values only to the copy. So the original connection should still be in AUTOCOMMIT, right? Well...
async def main(): e = ... async with e.connect() as conn: ... async with conn.execution_options( isolation_level="READ COMMITTED" ).begin(): # BEGIN; await conn.execute(sa.text("UPDATE..")) # UPDATE ...; # COMMIT; await conn.execute(sa.text("SELECT now()")) # BEGIN; SELECT now(); # ROLLBACK;
Unfortunately, the implicit transaction is haunting us again. This is because both the original connection and its shallow copy points to the same "DB-API" connection (in this case, a SQLAlchemy wrapper of asyncpg connection), and setting isolation_level
modifies the value on "DB-API" connection.
Returning a SQLAlchemy connection back to the pool resets the isolation_level
to its default value, and acquiring the same connection again will initialize the isolation_level
with values from execution_options
of the engine. But if we want to keep using the same connection without returning, we have to manually overwrite its isolation_level
again:
async def main(): e = ... async with e.connect() as conn: ... async with conn.execution_options( isolation_level="READ COMMITTED" ).begin(): # BEGIN; await conn.execute(sa.text("UPDATE..")) # UPDATE ...; # COMMIT; conn.execution_options(isolation_level="AUTOCOMMIT") await conn.execute(sa.text("SELECT now()")) # SELECT now();
Eventually we made it! ????
Encapsulating all such logic, GINO 1.4 could then provide decent WYSIWYG APIs again:
import gino async def main(): engine = await gino.create_engine("postgresql:///") async with engine.acquire() as conn: await conn.scalar("SELECT now()") # SELECT now(); async with conn.transaction(): # BEGIN; await conn.status("UPDATE ...") # UPDATE ...; # COMMIT;
提示
Now I feel that "implementing" auto-commit feature is more like restoring to the original database behavior, and having auto-commit turned off by default should be considered as a new feature called "auto-begin" or "implicit transaction". And it's a bad design introduced in early PEP 249, affecting SQLAlchemy and the ecosystem.
Isolation Levels
By far, we only used 2 isolation_level
values:
AUTOCOMMIT
READ COMMITTED
AUTOCOMMIT
is not a valid PostgreSQL isolation level. It's only recognized and consumed by the SQLAlchemy asyncpg dialect to bypass the "auto-begin" simulation.
READ COMMITTED
is the default PostgreSQL isolation level. You can verify this by executing a SQL directly in a transaction:
# BEGIN; BEGIN # SHOW TRANSACTION ISOLATION LEVEL; transaction_isolation ----------------------- read committed (1 row) # ROLLBACK; ROLLBACK
To start a transaction in a different isolation level, you may:
# BEGIN ISOLATION LEVEL SERIALIZABLE; BEGIN # SHOW TRANSACTION ISOLATION LEVEL; transaction_isolation ----------------------- serializable (1 row) # ROLLBACK; ROLLBACK
As mentioned earlier, all PostgreSQL statements are executed in a transaction. If no explicit BEGIN
in place, an implicit transaction is used. So this SQL also works individually:
# SHOW TRANSACTION ISOLATION LEVEL; transaction_isolation ----------------------- read committed (1 row)
But how could we modify the isolation level of such implicit transactions? The answer is to set isolation level session-wise. This affects all subsequent transactions, including both implicit and explicit ones, except for explicit transactions with explicit isolation levels:
# SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE; SET # SHOW TRANSACTION ISOLATION LEVEL; transaction_isolation ----------------------- serializable (1 row) # BEGIN; BEGIN # SHOW TRANSACTION ISOLATION LEVEL; transaction_isolation ----------------------- serializable (1 row) # ROLLBACK; ROLLBACK # BEGIN ISOLATION LEVEL READ COMMITTED; BEGIN # SHOW TRANSACTION ISOLATION LEVEL; transaction_isolation ----------------------- read committed (1 row) # ROLLBACK; ROLLBACK
Then let's see how SQLAlchemy with asyncpg solves this problem:
async def main(): e = create_async_engine( "postgresql+asyncpg:///", execution_options={"isolation_level": "SERIALIZABLE"}, ) async with e.connect() as conn: async with conn.begin(): # BEGIN ISOLATION LEVEL SERIALIZABLE; await conn.execute(sa.text("UPDATE ...")) # UPDATE ...; # COMMIT;
Under the neath, SQLAlchemy is leveraging asyncpg's Connection.transaction(isolation="...")
to set isolation level per transaction. In GINO, we just need to store the user-defined isolation level, and set before transactions.
But there are 2 issues:
User-defined isolation level is not applied in PostgreSQL implicit transactions (a.k.a. auto-commit statements), because no one
SET SESSION
.asyncpg has a bug that
Connection.transaction(isolation="read_committed")
always emitBEGIN
without explicit isolation level, regardless of the actual default isolation level.
The asyncpg bug should be fixed from upstream, but we could leverage a session-wide isolation level setter from base SQLAlchemy dialect for PostgreSQL:
import sqlalchemy as sa from sqlalchemy import event from sqlalchemy.dialects.postgresql.base import PGDialect from sqlalchemy.ext.asyncio import create_async_engine async def main(): e = create_async_engine( "postgresql+asyncpg:///", execution_options={"isolation_level": "AUTOCOMMIT"}, ) def set_isolation_level(dbapi_conn, record): PGDialect.set_isolation_level( e.sync_engine.dialect, dbapi_conn, "SERIALIZABLE", ) event.listen(e.sync_engine, "connect", set_isolation_level) async with e.connect() as conn: print(await conn.scalar(sa.text("SHOW TRANSACTION ISOLATION LEVEL"))) # Outputs: serializable
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论