sqlalchemy mixin:after_create 未在子进程中触发
我正在开发 pq 库(PostgreSQL 驱动的 python 队列系统)的 ORM 风格版本,用户可以在其中有自己的队列模型。它还添加了批量插入/获取、异步支持等功能(如果一切顺利,我将能够发布它)。
我在创建触发器(我使用 PostgreSQL 通知系统)创建表后自动时遇到困难(我想让使用尽可能简单,这样会好得多而不是添加一个额外的类方法来创建触发器)。
这与此帖子中的答案类似,但是我无法使用此解决方案,因为我需要传递连接< /strong> (用于通过检查连接的方言来转义 SQL 标识符以及检查对象是否预先存在)。
这是我根据我之前提到的帖子所做的尝试。我为冗长的代码表示歉意,但我认为我必须包含一些上下文。
基本模型
from sqlalchemy import (BIGINT, Column, func, Index, nullslast,
nullsfirst, SMALLINT, TIMESTAMP)
from sqlalchemy.orm import declared_attr, declarative_mixin
from sqlalchemy.event import listens_for
# this is the function that returns the base model
def postgres_queue_base(schema:str='public', tz_aware:bool=True, use_trigger:bool=True) -> 'PostgresQueueBase':
@declarative_mixin # this is only for MyPy, it does not modify anything
class PostgresQueueBase:
__tablename__ = 'queue'
@declared_attr
def __table_args__(cls):
return (Index(nullsfirst(cls.schedule_at), nullslast(cls.dequeued_at), postgresql_where=(cls.dequeued_at == None)),
{'schema':schema})
id = Column('id', BIGINT, primary_key=True)
internal_mapping = Column('internal_mapping', BIGINT, nullable=False)
enqueued_at = Column('enqueued_at', TIMESTAMP(timezone=tz_aware), nullable=False, server_default=func.now())
dequeued_at = Column('dequeued_at', TIMESTAMP(timezone=tz_aware))
expected_at = Column(TIMESTAMP(timezone=tz_aware))
schedule_at = Column(TIMESTAMP(timezone=tz_aware))
status = Column(SMALLINT, index=True)
@listens_for(PostgresQueueBase, "instrument_class", propagate=True)
def instrument_class(mapper, class_):
print('EVENT INSTRUMENT CLASS')
if use_trigger and mapper.local_table is not None:
trigger_for_table(table=mapper.local_table)
def trigger_for_table(table):
print('Registering after_create event')
@listens_for(table, "after_create")
def create_trigger(table, connection):
print('AFTER CREATE EVENT')
# code that creates triggers and logs that (here I'll just print something and put pseudo code in a comment)
# trig = PostgresQueueTrigger(schema=get_schema_from_model(table), table_name=table.name, connection=connection)
# trig.add_trigger()
print('Creating notify function public.notify_job')
# unique trigger name using hash of schema.table_name (avoids problems with long names and special chars)
print('Creating trigger trigger_job_5d69fc3870b446d0a1f56a793b799ae3')
return PostgresQueueBase
当我尝试基本模型时,
from sqlalchemy import Column, create_engine, INTEGER, TEXT
from sqlalchemy.orm import declarative_base
# IMPORTANT: inherit both a declarative base AND the postgres queue base
Base = declarative_base()
PostgresQueueBase = postgres_queue_base(schema='public')
# create custom queue model
class MyQueue(Base, PostgresQueueBase):
# optional custom table name (by default it is "queue")
__tablename__ = 'demo_queue'
# custom columns
operation = Column(TEXT)
project_id = Column(INTEGER)
# create table in database
# change connection string accordingly!
engine = create_engine('postgresql://username:password@localhost:5432/postgres')
Base.metadata.create_all(bind=engine)
EVENT INSTRUMENT CLASS
Registering after_create event
我看不到“创建事件之后”打印出来
I am working on an ORM style version of the pq library (PostgreSQL powered python queue system) where users can have their own queue model. It also has added features such as bulk insert/get, asynchronous support and more (if all goes well I'll be able to publish it).
I am having difficulties creating a trigger (I use a PostgreSQL notification system) automatically after table creation (I want to make the usage as simple as possible so that would be much better than adding an additional classmethod for creating the trigger).
This is similar to the answer in this post however I cannot use this solution because I need to pass a connection (for escaping SQL identifiers by checking the dialect of the connection and for checking if objects exist beforehand).
Here is my attempt at it based on the post I mentionned earlier. I apologize for the long code but I figured I had to include a bit of context.
Base model
from sqlalchemy import (BIGINT, Column, func, Index, nullslast,
nullsfirst, SMALLINT, TIMESTAMP)
from sqlalchemy.orm import declared_attr, declarative_mixin
from sqlalchemy.event import listens_for
# this is the function that returns the base model
def postgres_queue_base(schema:str='public', tz_aware:bool=True, use_trigger:bool=True) -> 'PostgresQueueBase':
@declarative_mixin # this is only for MyPy, it does not modify anything
class PostgresQueueBase:
__tablename__ = 'queue'
@declared_attr
def __table_args__(cls):
return (Index(nullsfirst(cls.schedule_at), nullslast(cls.dequeued_at), postgresql_where=(cls.dequeued_at == None)),
{'schema':schema})
id = Column('id', BIGINT, primary_key=True)
internal_mapping = Column('internal_mapping', BIGINT, nullable=False)
enqueued_at = Column('enqueued_at', TIMESTAMP(timezone=tz_aware), nullable=False, server_default=func.now())
dequeued_at = Column('dequeued_at', TIMESTAMP(timezone=tz_aware))
expected_at = Column(TIMESTAMP(timezone=tz_aware))
schedule_at = Column(TIMESTAMP(timezone=tz_aware))
status = Column(SMALLINT, index=True)
@listens_for(PostgresQueueBase, "instrument_class", propagate=True)
def instrument_class(mapper, class_):
print('EVENT INSTRUMENT CLASS')
if use_trigger and mapper.local_table is not None:
trigger_for_table(table=mapper.local_table)
def trigger_for_table(table):
print('Registering after_create event')
@listens_for(table, "after_create")
def create_trigger(table, connection):
print('AFTER CREATE EVENT')
# code that creates triggers and logs that (here I'll just print something and put pseudo code in a comment)
# trig = PostgresQueueTrigger(schema=get_schema_from_model(table), table_name=table.name, connection=connection)
# trig.add_trigger()
print('Creating notify function public.notify_job')
# unique trigger name using hash of schema.table_name (avoids problems with long names and special chars)
print('Creating trigger trigger_job_5d69fc3870b446d0a1f56a793b799ae3')
return PostgresQueueBase
When I try the base model
from sqlalchemy import Column, create_engine, INTEGER, TEXT
from sqlalchemy.orm import declarative_base
# IMPORTANT: inherit both a declarative base AND the postgres queue base
Base = declarative_base()
PostgresQueueBase = postgres_queue_base(schema='public')
# create custom queue model
class MyQueue(Base, PostgresQueueBase):
# optional custom table name (by default it is "queue")
__tablename__ = 'demo_queue'
# custom columns
operation = Column(TEXT)
project_id = Column(INTEGER)
# create table in database
# change connection string accordingly!
engine = create_engine('postgresql://username:password@localhost:5432/postgres')
Base.metadata.create_all(bind=engine)
EVENT INSTRUMENT CLASS
Registering after_create event
I cannot see "AFTER CREATE EVENT" printed out ????. How do I get the "after_create" event to be fired?
Thanks in advance for your help ????!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
抱歉,我终于弄清楚了...该表已经存在,因此事件从未触发。另外,上面的代码在事件中存在一些错误(我无法测试它们,因为它们没有被执行),并且 table_args 中的复合索引以某种方式获得名称“”“NULLS FIRST””。我使用哈希以获得更好的名称并避免字符限制或转义问题。
Sorry, I finally figured it out... The table already existed so the events were never firing. Also the code above has some errors in the events (I could not test them since they were not being executed) and the composite index in table_args somehow gets the name """ NULLS FIRST"". I used a hash to have a better name and avoid problems with character limitation or escaping.