Python 与 postgres 使用命名变量和批量插入

发布于 2024-12-10 11:13:02 字数 3081 浏览 0 评论 0原文

我需要一些帮助来理解 Python 和 postgres 如何处理事务和批量插入,特别是在单个事务中插入多个数据集时。 环境:

  • Windows 7 64位
  • Python 3.2
  • Postgresql 9.1
  • psycopg2

这是我的场景: 我正在将一个数据库(oracle)中的数据转换为 xml 字符串,并将该数据插入到一个新数据库(postgres)中。这是一个很大的数据集,因此我正在尝试优化一些插入内容。我正在考虑很多此类数据的库类型对象,因此我有一个库表,然后是用于 xml 元数据和 xml 内容的表,该数据的字段是数据库中的文本类型。我从 Oracle 中提取数据,然后创建需要插入的数据的字典。我有 3 个插入语句,第一个插入使用序列 id 在库表中创建一条记录,并且该 id 对于将 xml 插入元数据和内容表的接下来两个查询中的关系是必需的。这是我正在谈论的一个示例:

for inputKey in libDataDict.keys():
  metaString = libDataDict[inputKey][0]
  contentString = libDataDict[inputKey][1]
  insertLibDataList.append({'objIdent':"%s" % inputKey, 'objName':"%s" % inputKey, objType':libType})
  insertMetadataDataList.append({'objIdent':inputKey,'objMetadata':metaString}) 
  insertContentDataList.append({'objIdent':inputKey, 'objContent':contentString})

dataDict['cmsLibInsert'] = insertLibDataList
dataDict['cmsLibMetadataInsert'] = insertMetadataDataList
dataDict['cmsLibContentInsert'] = insertContentDataList

sqlDict[0] = {'sqlString':"insert into cms_libraries (cms_library_ident, cms_library_name, cms_library_type_id, cms_library_status_id) \
              values (%(objIdent)s, %(objName)s, (select id from cms_library_types where cms_library_type_name = %(objType)s), \
              (select id from cms_library_status where cms_library_status_name = 'active'))", 'data':dataDict['cmsLibInsert']}

sqlDict[1] = {'sqlString':"insert into cms_library_metadata (cms_library_id, cms_library_metadata_data) values \
              ((select id from cms_libraries where cms_library_ident = %(objIdent)s), $$%(objMetadata)s$$)", \
              'data':dataDict['cmsLibMetadataInsert']}

sqlDict[2] = {'sqlString':"insert into cms_library_content (cms_library_id, cms_library_content_data) values \
              ((select id from cms_libraries where cms_library_ident = %(objIdent)s), $$%(objContent)s$$)", \
              'data':dataDict['cmsLibContentInsert']}

bulkLoadData(myConfig['pgConn'], myConfig['pgCursor'], sqlDict)

我遇到的问题是当我运行第一个查询(sqlDict[0])并执行插入时,只要我将其分开并在运行下一个查询之前提交,一切都会正常工作二。理想情况下,我希望所有这些查询都在同一个事务中,但它失败了,因为它无法从 cms_libraries 表中找到第二个和第三个查询的 id。 这是我当前的插入代码:

def bulkLoadData(dbConn, dbCursor, sqlDict):
 try:
   libInsertSql = sqlDict.pop(0)
   dbSql = libInsertSql['sqlString']
   data = libInsertSql['data']
   dbCursor.executemany(dbSql, data)
   dbConn.commit()
   for sqlKey in sqlDict:
     dbSql = sqlDict[sqlKey]['sqlString']
     data = sqlDict[sqlKey]['data']
     dbCursor.executemany(dbSql, data)

   dbConn.commit()

以前,我将值附加到查询中,然后为每个插入运行查询。当我这样做时,我可以将它们全部放在同一个事务中,它会找到生成的 id,一切都很好。我不明白为什么当我使用executemany()进行批量插入时它找不到id?有没有办法在同一事务中进行批量插入和其他两个查询?

我一直在阅读本文档并搜索 stackoverflow 和互联网,但没有找到我的问题的答案: pyscopg 文档 以及 postgres 的: Postgresql 字符串文档

任何帮助、建议或评论都会受到赞赏。 谢谢, 米奇

I need some help understanding how Python and postgres handle transactions and bulk inserts specifically when inserting several data sets in a single transaction.
Environment:

  • Windows 7 64bit
  • Python 3.2
  • Postgresql 9.1
  • psycopg2

Here is my scenario:
I am converting data from one database(oracle) into xml strings and inserting that data into a new database(postgres). This is a large dataset so I'm trying to optimize some of my inserts. A lot of this data I'm considering library type objects, so I have a library table and then tables for my xml metadata and xml content, the fields for this data are text types in the database. I pull the data out of oracle and then I am creating dictionaries of the data I need to insert. I have 3 insert statements, the first insert creates a record in the library table using a serial id, and that id is necessary for the relationship in the next two queries that insert the xml into the metadata and content tables. Here is an example of what I'm talking about:

for inputKey in libDataDict.keys():
  metaString = libDataDict[inputKey][0]
  contentString = libDataDict[inputKey][1]
  insertLibDataList.append({'objIdent':"%s" % inputKey, 'objName':"%s" % inputKey, objType':libType})
  insertMetadataDataList.append({'objIdent':inputKey,'objMetadata':metaString}) 
  insertContentDataList.append({'objIdent':inputKey, 'objContent':contentString})

dataDict['cmsLibInsert'] = insertLibDataList
dataDict['cmsLibMetadataInsert'] = insertMetadataDataList
dataDict['cmsLibContentInsert'] = insertContentDataList

sqlDict[0] = {'sqlString':"insert into cms_libraries (cms_library_ident, cms_library_name, cms_library_type_id, cms_library_status_id) \
              values (%(objIdent)s, %(objName)s, (select id from cms_library_types where cms_library_type_name = %(objType)s), \
              (select id from cms_library_status where cms_library_status_name = 'active'))", 'data':dataDict['cmsLibInsert']}

sqlDict[1] = {'sqlString':"insert into cms_library_metadata (cms_library_id, cms_library_metadata_data) values \
              ((select id from cms_libraries where cms_library_ident = %(objIdent)s), $%(objMetadata)s$)", \
              'data':dataDict['cmsLibMetadataInsert']}

sqlDict[2] = {'sqlString':"insert into cms_library_content (cms_library_id, cms_library_content_data) values \
              ((select id from cms_libraries where cms_library_ident = %(objIdent)s), $%(objContent)s$)", \
              'data':dataDict['cmsLibContentInsert']}

bulkLoadData(myConfig['pgConn'], myConfig['pgCursor'], sqlDict)

The problem I have is when I run the first query(sqlDict[0]) and do the insert everything works fine as long as I do it separate and commit before I run the next two. Ideally I would like all these queries in the same transaction, but it fails because it can't find the id from cms_libraries table for the 2nd and 3rd queries.
Here is my current insert code:

def bulkLoadData(dbConn, dbCursor, sqlDict):
 try:
   libInsertSql = sqlDict.pop(0)
   dbSql = libInsertSql['sqlString']
   data = libInsertSql['data']
   dbCursor.executemany(dbSql, data)
   dbConn.commit()
   for sqlKey in sqlDict:
     dbSql = sqlDict[sqlKey]['sqlString']
     data = sqlDict[sqlKey]['data']
     dbCursor.executemany(dbSql, data)

   dbConn.commit()

Previously I was appending the values into the query and then running a query for each insert. When I do that I can put it all in the same transaction and it finds the generated id and everything is fine. I don't understand why it doesn't find the id when I do the bulk insert with executemany()? Is there a way to do the bulk insert and the other two queries in the same transaction?

I have been reading this documentation and searching stackoverflow and the internet but have not found an answer to my problem:
pyscopg docs
as well as postgres's:
Postgresql string docs

Any help, suggestions, or comments would be appreciated.
Thanks,
Mitch

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

转身泪倾城 2024-12-17 11:13:02

这里你有两个选择。要么在外部生成 ID(这允许您进行批量插入),要么从序列生成 ID(这意味着您必须进行单项插入)。我认为计算外部 ID 生成和批量加载非常简单(尽管我建议您查看 ETL工具,而不是在 python 中手动编码)。如果您需要从序列中提取 ID,那么您应该考虑服务器端 准备好的声明

您的第一个语句应如下所示:

dbCursor.execute("""
PREPARE cms_lib_insert (bigint, text, text) AS 
INSERT INTO cms_libraries (cms_library_ident, cms_library_name, cms_library_type_id, cms_library_status_id)
VALUES ($1, $2,
    (select id from cms_library_types where cms_library_type_name = $3), 
    (select id from cms_library_status where cms_library_status_name = 'active')
)
RETURNING cms_library.id
""")

您将在启动时运行一次。然后您将需要在每个条目级别运行以下 EXECUTE 语句。

dbCursor.execute("""
EXECUTE cms_lib_insert(%(objIndent)s, %(objName)s, %(objType)s)
""", {'objIndent': 345, 'objName': 'foo', 'objType': 'bar'))
my_new_id = dbCursor.fetchone()[0]

这将返回生成的序列号。展望未来,我强烈建议您摆脱当前尝试抽象数据库通信(您的 sqlDict 方法)所遵循的模式,并采用非常直接的编码模式(聪明是您的敌人,它会降低性能)调整更加困难)。

您需要将插入批量化为适合性能的块大小。这意味着根据您的实际行为调整您的 BLOCK_SIZE。您的代码应如下所示:

BLOCK_SIZE = 500
while not_done:
   dbCursor.begin()
   for junk in irange(BLOCK_SIZE):
       dbCursor.execute("EXECUTE cms_lib_insert(...)")
       cms_lib_id = dbCursor.fetchone()[0]     # you're using this below.
       dbCursor.executemany("EXECUTE metadata_insert(...)")
       dbCursor.executemany("EXECUTE library_insert(...)")
   dbCursor.commit()

如果您需要实现高于此的性能级别,下一步是构建一个插入处理程序函数,该函数接受依赖表的行数组。我不建议这样做,因为它很快就会成为维护的噩梦。

You have two choices here. Either generate the IDs externally (which allows you to do your bulk inserts) or generate them from the serial (which means you have to do single entry inserts). I think it's pretty straight-forward figuring out external ID generation and bulk loading (although I'd recommend you take a look at an ETL tool rather than hand-coding something in python). If you need to pull IDs from the serial, then you should consider server-side prepared statements.

Your first statement should look like the following:

dbCursor.execute("""
PREPARE cms_lib_insert (bigint, text, text) AS 
INSERT INTO cms_libraries (cms_library_ident, cms_library_name, cms_library_type_id, cms_library_status_id)
VALUES ($1, $2,
    (select id from cms_library_types where cms_library_type_name = $3), 
    (select id from cms_library_status where cms_library_status_name = 'active')
)
RETURNING cms_library.id
""")

You'll run this once, at startup time. Then you'll want to be running the following EXECUTE statement on a per-entry level.

dbCursor.execute("""
EXECUTE cms_lib_insert(%(objIndent)s, %(objName)s, %(objType)s)
""", {'objIndent': 345, 'objName': 'foo', 'objType': 'bar'))
my_new_id = dbCursor.fetchone()[0]

This will return the generated serial id. Going forward, I'd strongly recommend that you get away from the pattern that you're currently following of attempting to abstract the database communications (your sqlDict approach) and go with very direct coding pattern (clever is your enemy here, it makes performance tuning harder).

You'll want to batch your inserts into a block size that works for performance. That means tuning your BLOCK_SIZE based on your actual behavior. Your code should look something like the following:

BLOCK_SIZE = 500
while not_done:
   dbCursor.begin()
   for junk in irange(BLOCK_SIZE):
       dbCursor.execute("EXECUTE cms_lib_insert(...)")
       cms_lib_id = dbCursor.fetchone()[0]     # you're using this below.
       dbCursor.executemany("EXECUTE metadata_insert(...)")
       dbCursor.executemany("EXECUTE library_insert(...)")
   dbCursor.commit()

If you need to achieve performance levels higher than this, the next step is building an insert handler function which takes arrays of rows for the dependent tables. I do not recommend doing this as it quickly becomes a maintenance nightmare.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文