psycopg2:用一个查询插入多行

发布于 2024-12-15 18:05:59 字数 335 浏览 3 评论 0 原文

我需要用一个查询插入多行(行数不是恒定的),所以我需要执行这样的查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我知道的唯一方法是,

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要一些更简单的方法。

I need to insert multiple rows with one query (number of rows is not constant), so I need to execute query like this one:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

The only way I know is

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

but I want some simpler way.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(21

相守太难 2024-12-22 18:06:00

cursor.copy_from 是我见过的最快的解决方案到目前为止发现用于批量插入。 这是我制作的要点,其中包含一个名为 IteratorFile 的类,它允许生成字符串的迭代器像读取文件。我们可以使用生成器表达式将每个输入记录转换为字符串。因此,解决方案是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这个微不足道的参数大小,它不会产生太大的速度差异,但我在处理数千行以上时看到了很大的加速。与构建巨大的查询字符串相比,它的内存效率也更高。迭代器一次只能在内存中保存一个输入记录,在某些时候,您将在 Python 进程或 Postgres 中通过构建查询字符串耗尽内存。

cursor.copy_from is the fastest solution I've found for bulk inserts by far. Here's a gist I made containing a class named IteratorFile which allows an iterator yielding strings to be read like a file. We can convert each input record to a string using a generator expression. So the solution would be

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

For this trivial size of args it won't make much of a speed difference, but I see big speedups when dealing with thousands+ of rows. It will also be more memory efficient than building a giant query string. An iterator would only ever hold one input record in memory at a time, where at some point you'll run out of memory in your Python process or in Postgres by building the query string.

哽咽笑 2024-12-22 18:06:00

Psycopg2 教程页面的片段 Postgresql.org(见底部)

我想向您展示的最后一项是如何使用字典插入多行。如果您有以下情况:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

您可以使用以下方法轻松插入字典中的所有三行:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

它不会节省太多代码,但看起来确实更好。

A snippet from Psycopg2's tutorial page at Postgresql.org (see bottom):

A last item I would like to show you is how to insert multiple rows using a dictionary. If you had the following:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

You could easily insert all three rows within the dictionary by using:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

It doesn't save much code, but it definitively looks better.

呆° 2024-12-22 18:06:00

安全漏洞

截至 2022 年 11 月 16 日,@Clodoaldo Neto(针对 Psycopg 2.6)、@Joseph Sheedy、@JJ、@Bart Jonk、@kevo Njoki、@TKoutny 和 @Nihal Sharma 的答案包含 SQL注入漏洞并且不应该 用过的。

迄今为止最快的提案 (copy_from) 也不应该使用,因为很难正确转义数据。当尝试插入 '"\n\ 等字符时,这一点很明显 。

\t 或 \n作者 的psycopg2 还建议不要使用 copy_from

copy_from() 和 copy_to() 实际上只是古老且不完整的方法

最快的方法

最快的方法是 cursor.copy_expert,它可以直接从 CSV 文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert 也是即时生成 CSV 文件的最快方法。作为参考,请参阅以下 CSVFile 类,该类会注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

然后可以像这样使用该类:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果您的所有数据都适合内存,您也可以直接生成整个 CSV 数据,而无需 CSVFile 类,但如果您不知道要生成多少数据将来插入,您可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

  • 914 毫秒 - 多次调用 cursor.execute
  • 846 毫秒 - cursor.executemany
  • 362 毫秒 - psycopg2.extras.execute_batch
  • 346 毫秒 - <代码>execute_batch与page_size=1000
  • 265 毫秒 - execute_batch,使用准备好的语句
  • 161 毫秒 - psycopg2.extras.execute_values
  • 127 毫秒 - cursor.execute,使用字符串连接值
  • 39 毫秒 - copy_expert 一次性生成整个 CSV 文件
  • 32 毫秒 - copy_expertCSVFile

Security vulnerabilities

As of 2022-11-16, the answers by @Clodoaldo Neto (for Psycopg 2.6), @Joseph Sheedy, @J.J, @Bart Jonk, @kevo Njoki, @TKoutny and @Nihal Sharma contain SQL injection vulnerabilities and should not be used.

The fastest proposal so far (copy_from) should not be used either because it is difficult to escape the data correctly. This is easily apparent when trying to insert characters like ', ", \n, \, \t or \n.

The author of psycopg2 also recommends against copy_from:

copy_from() and copy_to() are really just ancient and incomplete methods

The fastest method

The fastest method is cursor.copy_expert, which can insert data straight from CSV files.

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert is also the fastest method when generating the CSV file on-the-fly. For reference, see the following CSVFile class, which takes care to limit memory usage.

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

This class can then be used like:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

If all your data fits into memory, you can also generate the entire CSV data directly without the CSVFile class, but if you do not know how much data you are going to insert in the future, you probably should not do that.

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

Benchmark results

  • 914 milliseconds - many calls to cursor.execute
  • 846 milliseconds - cursor.executemany
  • 362 milliseconds - psycopg2.extras.execute_batch
  • 346 milliseconds - execute_batch with page_size=1000
  • 265 milliseconds - execute_batch with prepared statement
  • 161 milliseconds - psycopg2.extras.execute_values
  • 127 milliseconds - cursor.execute with string-concatenated values
  • 39 milliseconds - copy_expert generating the entire CSV file at once
  • 32 milliseconds - copy_expert with CSVFile
筱武穆 2024-12-22 18:06:00

所有这些技术在 Postgres 术语中都称为“扩展插入”,截至 2016 年 11 月 24 日,它仍然比 Psychopg2 的executemany() 和该线程中列出的所有其他方法快得多(我在讨论此问题之前尝试过)答案)。

这里有一些不使用 cur.mogrify 的代码,很不错,只是为了让您了解一下:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但应该注意的是,如果您可以使用 copy_from(),你应该使用 copy_from ;)

All of these techniques are called 'Extended Inserts" in Postgres terminology, and as of the 24th of November 2016, it's still a ton faster than psychopg2's executemany() and all the other methods listed in this thread (which i tried before coming to this answer).

Here's some code which doesnt use cur.mogrify and is nice and simply to get your head around:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

But it should be noted that if you can use copy_from(), you should use copy_from ;)

简美 2024-12-22 18:06:00

10 年后,Psycopg 3 和 PostgreSQL 14 或更高版本的答案是:使用 管道模式。在Psycopg 3的管道模式实现中的execute/executemany语句中使用普通INSERT快速、安全地对抗 SQL 注入。从 Psycopg 3.1 开始,executemany() 在内部使用管道模式2

启动管道模式和executemany() 的示例。由于 Psycopg 3.1 conn.pipeline() 仅对于execute() 是必需的,但如果只调用一次,则不再对于executemany() 是必需的。

args = [(1,2), (3,4), (5,6)]
with conn.pipeline():
  cur.executemany("INSERT INTO t (a, b) VALUES (%s, %s)", args)

10 years later, an answer for Psycopg 3 and PostgreSQL 14 or newer is: use pipeline mode. Use ordinary INSERTs in execute/executemany statements in Psycopg 3's pipeline mode implementation to be fast and safe against SQL injection. Starting with Psycopg 3.1, executemany() makes use internally of the pipeline mode2.

Example with starting pipeline mode and executemany(). Since Psycopg 3.1 conn.pipeline() is only necessary for execute(), but no longer for executemany(), if you call it only once.

args = [(1,2), (3,4), (5,6)]
with conn.pipeline():
  cur.executemany("INSERT INTO t (a, b) VALUES (%s, %s)", args)
哆兒滾 2024-12-22 18:06:00

我已经使用上面 ant32 的答案好几年了。但是我发现 python 3 中存在错误,因为 mogrify 返回一个字节字符串。

显式转换为字节字符串是使代码与 python 3 兼容的简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)

I've been using ant32's answer above for several years. However I've found that is thorws an error in python 3 because mogrify returns a byte string.

Converting explicitly to bytse strings is a simple solution for making code python 3 compatible.

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)
伊面 2024-12-22 18:06:00

executemany 接受元组数组

https://www.postgresqltutorial.com /postgresql-python/插入/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

executemany accept array of tuples

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
南薇 2024-12-22 18:06:00

cursor.copyfrom 解决方案://stackoverflow.com/users/958118/joseph-sheedy">@jopseph.sheedy (https://stackoverflow.com/users/958118/joseph-sheedy)(https://stackoverflow.com/a/30721460/11100064)确实快如闪电。

但是,他给出的示例通常不适用于具有任意数量字段的记录,我花了一些时间才弄清楚如何正确使用它。

IteratorFile 需要使用制表符分隔的字段进行实例化,如下所示(r 是一个字典列表,其中每个字典都是一条记录):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了概括任意数量的字段,我们将首先创建一个行字符串使用正确数量的制表符和字段占位符:"{}\t{}\t{}....\t{}",然后使用 .format() 填写字段值: *list(r.values())) for r in reports

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

要点在这里

The cursor.copyfrom solution as provided by @jopseph.sheedy (https://stackoverflow.com/users/958118/joseph-sheedy) above (https://stackoverflow.com/a/30721460/11100064) is indeed lightning fast.

However, the example he gives are not generically usable for a record with any number of fields and it took me while to figure out how to use it correctly.

The IteratorFile needs to be instantiated with tab-separated fields like this (r is a list of dicts where each dict is a record):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

To generalise for an arbitrary number of fields we will first create a line string with the correct amount of tabs and field placeholders : "{}\t{}\t{}....\t{}" and then use .format() to fill in the field values : *list(r.values())) for r in records:

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

complete function in gist here.

梦里人 2024-12-22 18:06:00

自此问题发布以来, execute_batch 已添加到 psycopg2 中。

它比 execute_values 更快。

execute_batch has been added to psycopg2 since this question was posted.

It is faster than execute_values.

怪我鬧 2024-12-22 18:06:00

另一种不错且有效的方法 - 是将要插入的行作为 1 个参数传递,
这是 json 对象的数组。

例如,您传递参数:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

它是数组,其中可以包含任意数量的对象。
那么你的 SQL 看起来像:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

注意:你的 postgress 必须足够新,才能支持 json

Another nice and efficient approach - is to pass rows for insertion as 1 argument,
which is array of json objects.

E.g. you passing argument:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

It is array, which may contain any amount of objects inside.
Then your SQL looks like:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

Notice: Your postgress must be new enough, to support json

走走停停 2024-12-22 18:06:00

如果您使用 SQLAlchemy,则无需手动编写字符串,因为 SQLAlchemy 支持为单个值生成多行 VALUES 子句INSERT语句

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

If you're using SQLAlchemy, you don't need to mess with hand-crafting the string because SQLAlchemy supports generating a multi-row VALUES clause for a single INSERT statement:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)
音栖息无 2024-12-22 18:06:00

psycopg2 2.9.3

data = "(1, 2), (3, 4), (5, 6)"
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

data = [(1, 2), (3, 4), (5, 6)]
data = ",".join(map(str, data))
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

psycopg2 2.9.3

data = "(1, 2), (3, 4), (5, 6)"
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)

or

data = [(1, 2), (3, 4), (5, 6)]
data = ",".join(map(str, data))
query = "INSERT INTO t (a, b) VALUES {0}".format(data)
cursor.execute(query)
终陌 2024-12-22 18:06:00

Psycopg 3 提供了一种使用 PostgresCOPY 的简单方法,而且效率很高。

records = [(1,2), (3,4), (5,6)]
with cursor.copy("COPY example_table (col_a, col_b) FROM STDIN") as copy:
    for record in records:
        copy.write_row(record)

其他信息可在官方文档中找到

Psycopg 3 provides a simple way for using Postgres COPY, which is highly efficient.

records = [(1,2), (3,4), (5,6)]
with cursor.copy("COPY example_table (col_a, col_b) FROM STDIN") as copy:
    for record in records:
        copy.write_row(record)

Additional information is available in the official documentation.

殊姿 2024-12-22 18:06:00

如果你想在一个插入语句中插入多行(假设你没有使用 ORM),到目前为止对我来说最简单的方法是使用字典列表。这是一个示例:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

正如您所看到的,只会执行一个查询:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT

If you want to insert multiple rows within one insert statemens (assuming you are not using ORM) the easiest way so far for me would be to use list of dictionaries. Here is an example:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

As you can see only one query will be executed:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT
时光磨忆 2024-12-22 18:06:00

来自@ant32

def myInsertManyTuples(connection, table, tuple_of_tuples):
    cursor = connection.cursor()
    try:
        insert_len = len(tuple_of_tuples[0])
        insert_template = "("
        for i in range(insert_len):
            insert_template += "%s,"
        insert_template = insert_template[:-1] + ")"

        args_str = ",".join(
            cursor.mogrify(insert_template, x).decode("utf-8")
            for x in tuple_of_tuples
        )
        cursor.execute("INSERT INTO " + table + " VALUES " + args_str)
        connection.commit()

    except psycopg2.Error as e:
        print(f"psycopg2.Error in myInsertMany = {e}")
        connection.rollback()

From @ant32

def myInsertManyTuples(connection, table, tuple_of_tuples):
    cursor = connection.cursor()
    try:
        insert_len = len(tuple_of_tuples[0])
        insert_template = "("
        for i in range(insert_len):
            insert_template += "%s,"
        insert_template = insert_template[:-1] + ")"

        args_str = ",".join(
            cursor.mogrify(insert_template, x).decode("utf-8")
            for x in tuple_of_tuples
        )
        cursor.execute("INSERT INTO " + table + " VALUES " + args_str)
        connection.commit()

    except psycopg2.Error as e:
        print(f"psycopg2.Error in myInsertMany = {e}")
        connection.rollback()
岛歌少女 2024-12-22 18:06:00

我使用的解决方案可以在 1 毫秒内插入大约 8000 条记录

curtime = datetime.datetime.now()
postData = dict()
postData["title"] = "This is Title Text"
postData["body"] = "This a Body Text it Can be Long Text"
postData['created_at'] = curtime.isoformat()
postData['updated_at'] = curtime.isoformat()
data = []
for x in range(8000):
    data.append(((postData)))
vals = []
for d in postData:
    vals.append(tuple(d.values())) #Here we extract the Values from the Dict
flds = ",".join(map(str, postData[0]))
tableFlds =  ",".join(map(str, vals))
sqlStr = f"INSERT INTO posts ({flds}) VALUES {tableFlds}"
db.execute(sqlStr)
connection.commit()
rowsAffected = db.rowcount
print(f'{rowsAffected} Rows Affected')

The Solution am using can insert like 8000 records in 1 millisecond

curtime = datetime.datetime.now()
postData = dict()
postData["title"] = "This is Title Text"
postData["body"] = "This a Body Text it Can be Long Text"
postData['created_at'] = curtime.isoformat()
postData['updated_at'] = curtime.isoformat()
data = []
for x in range(8000):
    data.append(((postData)))
vals = []
for d in postData:
    vals.append(tuple(d.values())) #Here we extract the Values from the Dict
flds = ",".join(map(str, postData[0]))
tableFlds =  ",".join(map(str, vals))
sqlStr = f"INSERT INTO posts ({flds}) VALUES {tableFlds}"
db.execute(sqlStr)
connection.commit()
rowsAffected = db.rowcount
print(f'{rowsAffected} Rows Affected')
情泪▽动烟 2024-12-22 18:06:00

最后在 SQLalchemy1.2 版本中,添加了这个新的实现,以便在使用 use_batch_mode=True 初始化引擎时使用 psycopg2.extras.execute_batch() 而不是 executemany,例如:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后有人必须使用 SQLalchmey 不会费心去尝试 sqla 和 psycopg2 的不同组合直接SQL在一起..

Finally in SQLalchemy1.2 version, this new implementation is added to use psycopg2.extras.execute_batch() instead of executemany when you initialize your engine with use_batch_mode=True like:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

Then someone would have to use SQLalchmey won't bother to try different combinations of sqla and psycopg2 and direct SQL together..

征棹 2024-12-22 18:06:00

使用 aiopg - 下面的代码片段工作得很好

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)

Using aiopg - The snippet below works perfectly fine

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)
三岁铭 2024-12-22 18:05:59

我构建了一个程序,可以将多行插入到位于另一个城市的服务器中。

我发现使用此方法比 executemany 快大约 10 倍。在我的例子中,tup 是一个包含大约 2000 行的元组。使用该方法时大约需要10秒:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用该方法时大约需要2分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

I built a program that inserts multiple lines to a server that was located in another city.

I found out that using this method was about 10 times faster than executemany. In my case tup is a tuple containing about 2000 rows. It took about 10 seconds when using this method:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

and 2 minutes when using this method:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)
无声情话 2024-12-22 18:05:59

Psycopg 2.7 中的新 execute_values 方法

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

在 Psycopg 2.6 中执行此操作的 pythonic 方式:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

说明:如果要插入的数据以元组列表的形式给出

data = [(1,'x'), (2,'y')]

,那么它已经是所需的确切格式,如下所示

  1. insert 子句的 values 语法需要一个记录列表,如下所示

    插入到 t(a, b) 值 (1, 'x'),(2, 'y')

  2. Psycopg 将 Python tuple 改编为 Postgresql 记录

唯一必要的工作是提供一个由 psycopg 填充的记录列表模板

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

,并将其放入 insert 查询中

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

现在打印 insert_query 输出

insert into t (a, b) values %s,%s

到通常的 Psycopg< /code> 参数替换

cursor.execute(insert_query, data)

或者只是测试将发送到服务器的内容

print (cursor.mogrify(insert_query, data).decode('utf8'))

输出:

insert into t (a, b) values (1, 'x'),(2, 'y')

New execute_values method in Psycopg 2.7:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

The pythonic way of doing it in Psycopg 2.6:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
cursor.execute(insert_query, data)

Explanation: If the data to be inserted is given as a list of tuples like in

data = [(1,'x'), (2,'y')]

then it is already in the exact required format as

  1. the values syntax of the insert clause expects a list of records as in

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopg adapts a Python tuple to a Postgresql record.

The only necessary work is to provide a records list template to be filled by psycopg

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

and place it in the insert query

insert_query = 'insert into t (a, b) values {}'.format(records_list_template)

Printing the insert_query outputs

insert into t (a, b) values %s,%s

Now to the usual Psycopg arguments substitution

cursor.execute(insert_query, data)

Or just testing what will be sent to the server

print (cursor.mogrify(insert_query, data).decode('utf8'))

Output:

insert into t (a, b) values (1, 'x'),(2, 'y')
往日 2024-12-22 18:05:59

使用 psycopg2 2.7 更新:

经典的 executemany() 比 @ant32 的实现(称为“folded”)慢大约 60 倍,如本线程中所述:https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

此实现在 2.7 版本中添加到 psycopg2 中,称为 execute_values( ):

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

上一个答案:

要插入多行,请使用 multirow使用 execute()VALUES 语法比使用 psycopg2 executemany() 快约 10 倍。事实上,executemany() 只是运行许多单独的 INSERT 语句。

@ant32 的代码在 Python 2 中完美运行。但在 Python 3 中,cursor.mogrify() 返回字节,cursor.execute() 接受字节或字符串,并且','.join() 需要 str 实例。

因此,在 Python 3 中,您可能需要通过添加 .decode('utf-8') 来修改 @ant32 的代码:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

或者使用字节(使用 b''或仅限 b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

Update with psycopg2 2.7:

The classic executemany() is about 60 times slower than @ant32 's implementation (called "folded") as explained in this thread: https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

This implementation was added to psycopg2 in version 2.7 and is called execute_values():

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

Previous Answer:

To insert multiple rows, using the multirow VALUES syntax with execute() is about 10x faster than using psycopg2 executemany(). Indeed, executemany() just runs many individual INSERT statements.

@ant32 's code works perfectly in Python 2. But in Python 3, cursor.mogrify() returns bytes, cursor.execute() takes either bytes or strings, and ','.join() expects str instance.

So in Python 3 you may need to modify @ant32 's code, by adding .decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

Or by using bytes (with b'' or b"") only:

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文