如何使用 SqlAlchemy 进行更新插入?

发布于 2024-12-01 13:02:20 字数 1825 浏览 8 评论 0原文

我有一条记录,如果它不存在,我希望存在于数据库中,如果它已经存在(主键存在),我希望将字段更新到当前状态。这通常称为 upsert

以下不完整的代码片段演示了可行的方法,但它似乎过于笨拙(特别是如果有更多列)。更好/最好的方法是什么?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

有没有更好或更简洁的方法来做到这一点?像这样的东西会很棒:

sess.upsert_this(desired_default, unique_key = "name")

虽然 unique_key kwarg 显然是不必要的(ORM 应该能够轻松地解决这个问题),但我添加它只是因为 SQLAlchemy 倾向于只使用主键。例如:我一直在查看是否 Session.merge 是适用的,但这仅适用于主键,在本例中主键是一个自动增量 id,对于此目的来说并不是很有用。

一个示例用例就是启动可能已升级其默认预期数据的服务器应用程序。即:此更新插入没有并发问题。

I have a record that I want to exist in the database if it is not there, and if it is there already (primary key exists) I want the fields to be updated to the current state. This is often called an upsert.

The following incomplete code snippet demonstrates what will work, but it seems excessively clunky (especially if there were a lot more columns). What is the better/best way?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

Is there a better or less verbose way of doing this? Something like this would be great:

sess.upsert_this(desired_default, unique_key = "name")

although the unique_key kwarg is obviously unnecessary (the ORM should be able to easily figure this out) I added it just because SQLAlchemy tends to only work with the primary key. eg: I've been looking at whether Session.merge would be applicable, but this works only on primary key, which in this case is an autoincrementing id which is not terribly useful for this purpose.

A sample use case for this is simply when starting up a server application that may have upgraded its default expected data. ie: no concurrency concerns for this upsert.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(12

撑一把青伞 2024-12-08 13:02:20

SQLAlchemy 通过 on_conflict_do_update()on_conflict_do_nothing() 两种方法支持 ON CONFLICT

文档复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

SQLAlchemy supports ON CONFLICT with two methods on_conflict_do_update() and on_conflict_do_nothing().

Copying from the documentation:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)
獨角戲 2024-12-08 13:02:20

SQLAlchemy 确实有“保存或更新”行为,在最近的版本中已内置到 session.add 中,但以前是单独的 session.saveorupdate 调用。这不是“更新插入”,但它可能足以满足您的需求。

您询问具有多个唯一键的类是件好事;我相信这正是没有单一正确方法可以做到这一点的原因。主键也是唯一键。如果没有唯一约束,只有主键,这将是一个足够简单的问题:如果不存在给定 ID 的任何内容,或者如果 ID 为 None,则创建一条新记录;否则使用该主键更新现有记录中的所有其他字段。

然而,当存在额外的独特约束时,这种简单的方法就会出现逻辑问题。如果您想“更新插入”一个对象,并且对象的主键与现有记录匹配,但另一个唯一列与不同记录匹配,那么您该怎么办?同样,如果主键不匹配任何现有记录,但另一个唯一列确实匹配现有记录,那么怎么办?对于您的特定情况可能有一个正确的答案,但总的来说,我认为没有单一的正确答案。

这就是没有内置“更新插入”操作的原因。应用程序必须定义这在每种特定情况下的含义。

SQLAlchemy does have a "save-or-update" behavior, which in recent versions has been built into session.add, but previously was the separate session.saveorupdate call. This is not an "upsert" but it may be good enough for your needs.

It is good that you are asking about a class with multiple unique keys; I believe this is precisely the reason there is no single correct way to do this. The primary key is also a unique key. If there were no unique constraints, only the primary key, it would be a simple enough problem: if nothing with the given ID exists, or if ID is None, create a new record; else update all other fields in the existing record with that primary key.

However, when there are additional unique constraints, there are logical issues with that simple approach. If you want to "upsert" an object, and the primary key of your object matches an existing record, but another unique column matches a different record, then what do you do? Similarly, if the primary key matches no existing record, but another unique column does match an existing record, then what? There may be a correct answer for your particular situation, but in general I would argue there is no single correct answer.

That would be the reason there is no built in "upsert" operation. The application must define what this means in each particular case.

梦境 2024-12-08 13:02:20

如今,SQLAlchemy 提供了两个有用的函数 on_conflict_do_nothingon_conflict_do_update< /a>.这些函数很有用,但需要您从 ORM 接口切换到较低级别的接口 - SQLAlchemy 核心

尽管这两个函数使得使用 SQLAlchemy 语法进行更新插入并不那么困难,但这些函数远未提供完整的开箱即用的更新插入解决方案。

我的常见用例是在单个 SQL 查询/会话执行中更新插入大量行。我通常会在更新插入时遇到两个问题:

例如,我们已经习惯的更高级别的 ORM 功能缺失。您不能使用 ORM 对象,而必须在插入时提供 ForeignKey

我正在使用这个以下我编写的函数来处理这两个问题:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)

Nowadays, SQLAlchemy provides two helpful functions on_conflict_do_nothing and on_conflict_do_update. Those functions are useful but require you to swich from the ORM interface to the lower-level one - SQLAlchemy Core.

Although those two functions make upserting using SQLAlchemy's syntax not that difficult, these functions are far from providing a complete out-of-the-box solution to upserting.

My common use case is to upsert a big chunk of rows in a single SQL query/session execution. I usually encounter two problems with upserting:

For example, higher level ORM functionalities we've gotten used to are missing. You cannot use ORM objects but instead have to provide ForeignKeys at the time of insertion.

I'm using this following function I wrote to handle both of those issues:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)
凉栀 2024-12-08 13:02:20

我使用“三思而后行”的方法:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

优点是这是与数据库无关的,而且我认为读起来很清楚。缺点是在如下场景中存在潜在的竞争条件

  • 我们在数据库中查询 switch_command 但没有找到
  • 我们创建一个 switch_command
  • 另一个进程或线程创建一个与我们主键相同的 switch_command
  • 我们尝试提交我们的 switch_command

I use a "look before you leap" approach:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

The advantage is that this is db-neutral and I think it's clear to read. The disadvantage is that there's a potential race condition in a scenario like the following:

  • we query the db for a switch_command and don't find one
  • we create a switch_command
  • another process or thread creates a switch_command with the same primary key as ours
  • we try to commit our switch_command
蝶…霜飞 2024-12-08 13:02:20

下面的内容对我来说适用于 redshift 数据库,并且也适用于组合主键约束。

来源

在创建 SQLAlchemy 引擎时只需进行一些修改功能
def start_engine()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])

The below works fine for me with redshift database and will also work for combined primary key constraint.

SOURCE : this

Just few modifications required for creating SQLAlchemy engine in the function
def start_engine()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])
反目相谮 2024-12-08 13:02:20

有多个答案,这里还有另一个答案(YAA)。由于涉及元编程,其他答案不那么可读。 的示例

  • 这是

    使用 SQLAlchemy ORM

  • 显示如何在零行的情况下使用 on_conflict_do_nothing 创建行

  • 显示如何更新现有行(如果有)而不使用 on_conflict_do_update 创建新行

  • 使用表主键作为约束

原始问题此代码与什么相关


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )


    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://stackoverflow.com/a/49917004/315168
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )

There are multiple answers and here comes yet another answer (YAA). Other answers are not that readable due to the metaprogramming involved. Here is an example that

  • Uses SQLAlchemy ORM

  • Shows how to create a row if there are zero rows using on_conflict_do_nothing

  • Shows how to update the existing row (if any) without creating a new row using on_conflict_do_update

  • Uses the table primary key as the constraint

A longer example in the original question what this code is related to.


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )


    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://stackoverflow.com/a/49917004/315168
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )
夏日浅笑〃 2024-12-08 13:02:20

这允许根据字符串名称访问底层模型

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)

This allows access to the underlying models based on string names

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)
雨的味道风的声音 2024-12-08 13:02:20

这对我来说适用于 sqlite3 和 postgres。尽管它可能因组合主键约束而失败,并且很可能因附加唯一约束而失败。

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)

This works for me with sqlite3 and postgres. Albeit it might fail with combined primary key constraints and will most likely fail with additional unique constraints.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)
几度春秋 2024-12-08 13:02:20

由于我们在生成的默认 ID 和引用方面遇到问题,从而导致foreignkeyviolation错误,例如

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

我们必须排除更新字典的 id,否则它将始终生成为新的默认值。

此外,该方法还返回创建/更新的实体。

from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert


def upsert(session, data, key_columns, model):

    stmt = insert(model).values(data)
    
    # Important to exclude the ID for update!
    exclude_for_update = [model.id.name, *key_columns]
    update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}

    stmt = stmt.on_conflict_do_update(
        index_elements=key_columns,
        set_=update_dict
    ).returning(model)

    orm_stmt = (
        select(model)
        .from_statement(stmt)
        .execution_options(populate_existing=True)
    )

    return session.execute(orm_stmt).scalar()

示例:


class UpsertUser(Base):
    __tablename__ = 'upsert_user'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    name: str = Column(sa.String, nullable=False)
    user_sid: str = Column(sa.String, nullable=False, unique=True)
    house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)


class UpsertHouse(Base):
    __tablename__ = 'upsert_house'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
    admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)

# Usage

upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

注意:仅在 postgresql 上进行了测试,但也适用于支持重复键更新的其他数据库,例如 MySQL

As we had problems with generated default-ids and references which lead to ForeignKeyViolation-Errors like

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

we had to exclude the id for the update dict, as otherwise the it will be always generated as new default value.

In addition the method is returning the created/updated entity.

from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert


def upsert(session, data, key_columns, model):

    stmt = insert(model).values(data)
    
    # Important to exclude the ID for update!
    exclude_for_update = [model.id.name, *key_columns]
    update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}

    stmt = stmt.on_conflict_do_update(
        index_elements=key_columns,
        set_=update_dict
    ).returning(model)

    orm_stmt = (
        select(model)
        .from_statement(stmt)
        .execution_options(populate_existing=True)
    )

    return session.execute(orm_stmt).scalar()

Example:


class UpsertUser(Base):
    __tablename__ = 'upsert_user'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    name: str = Column(sa.String, nullable=False)
    user_sid: str = Column(sa.String, nullable=False, unique=True)
    house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)


class UpsertHouse(Base):
    __tablename__ = 'upsert_house'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
    admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)

# Usage

upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

Note: Only tested on postgresql but could work also for other DBs which support ON DUPLICATE KEY UPDATE e.g. MySQL

遗失的美好 2024-12-08 13:02:20

对于 sqlite,在定义 UniqueConstraint 时可以使用 sqlite_on_conflict='REPLACE' 选项,并使用 sqlite_on_conflict_unique 定义单列上的唯一约束。然后 session.add 将以类似于 upsert 的方式工作。请参阅官方文档

In case of sqlite, the sqlite_on_conflict='REPLACE' option can be used when defining a UniqueConstraint, and sqlite_on_conflict_unique for unique constraint on a single column. Then session.add will work in a way just like upsert. See the official documentation.

°如果伤别离去 2024-12-08 13:02:20

我使用此代码进行更新插入
在使用此代码之前,您应该向数据库中的表添加主键。

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.inspection import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.dialects.postgresql import insert

def upsert(df, engine, table_name, schema=None, chunk_size = 1000):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)
    
   # olny use common columns between df and table.
    table_columns = {column.name for column in table.columns}
    df_columns = set(df.columns)
    intersection_columns = table_columns.intersection(df_columns)
    
    df1 = df[intersection_columns] 
    records  = df1.to_dict('records')

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]
    

    with engine.connect() as conn:
        chunks = [records[i:i + chunk_size] for i in range(0, len(records), chunk_size)]
        for chunk in chunks:
            stmt = insert(table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            s = stmt.on_conflict_do_update(
                index_elements= primary_keys,
                set_=update_dict)
            conn.execute(s)

I use this code for upsert
Before using this code, you should add primary keys to table in database.

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.inspection import inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.dialects.postgresql import insert

def upsert(df, engine, table_name, schema=None, chunk_size = 1000):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)
    
   # olny use common columns between df and table.
    table_columns = {column.name for column in table.columns}
    df_columns = set(df.columns)
    intersection_columns = table_columns.intersection(df_columns)
    
    df1 = df[intersection_columns] 
    records  = df1.to_dict('records')

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]
    

    with engine.connect() as conn:
        chunks = [records[i:i + chunk_size] for i in range(0, len(records), chunk_size)]
        for chunk in chunks:
            stmt = insert(table).values(chunk)
            update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
            s = stmt.on_conflict_do_update(
                index_elements= primary_keys,
                set_=update_dict)
            conn.execute(s)
祁梦 2024-12-08 13:02:20

我使用了以下模式(我的用例更简单),

is_add = True
try:
   stmt = select(Model).where(Model.id == todo.id)
   _ = session.scalars(stmt).one()
except NoResultFound as err:
   is_add = True or Model.id is None

if _is_add:
   # insert the record
else:
   # update the record

I used the following pattern(my usecase more simple),

is_add = True
try:
   stmt = select(Model).where(Model.id == todo.id)
   _ = session.scalars(stmt).one()
except NoResultFound as err:
   is_add = True or Model.id is None

if _is_add:
   # insert the record
else:
   # update the record
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文