我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

如果你想在一个insert语句中插入多行(假设你没有使用ORM),到目前为止对我来说最简单的方法是使用字典列表。这里有一个例子:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

正如你所看到的,只会执行一个查询:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT

其他回答

来自Psycopg2教程页面Postgresql.org的一个片段(见底部):

我想向您展示的最后一项是如何使用字典插入多行。如果你有以下情况:

namedict = ({"first_name":"Joshua", "last_name":"Drake"},
            {"first_name":"Steven", "last_name":"Foo"},
            {"first_name":"David", "last_name":"Bar"})

你可以很容易地将这三行都插入到字典中:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

它没有节省多少代码,但它确实看起来更好。

最后,在SQLalchemy1.2版本中,这个新实现被添加到使用psycopg2.extras.execute_batch()而不是executemany来初始化引擎时使用use_batch_mode=True,例如:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后有人将不得不使用SQLalchmey不会费心尝试sqla和psycopg2和直接SQL的不同组合在一起。

安全漏洞

截至2022-11-16,@Clodoaldo Neto (Psycopg 2.6), @Joseph Sheedy, @J。J, @Bart Jonk, @kevo Njoki, @TKoutny和@Nihal Sharma包含SQL注入漏洞,不应使用。

目前为止最快的建议(copy_from)也不应该使用,因为它很难正确地转义数据。当尝试插入',",\n, \, \t或\n这样的字符时,这很容易看出。

psycopg2的作者也建议不要使用copy_from:

Copy_from()和copy_to()实际上只是古老且不完整的方法

最快的方法

最快的方法是游标。copy_expert,它可以直接从CSV文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert也是动态生成CSV文件时最快的方法。作为参考,请参阅下面的CSVFile类,该类注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

这个类可以这样使用:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果所有数据都适合内存,您也可以直接生成整个CSV数据,而不需要使用CSVFile类,但是如果您不知道将来要插入多少数据,则可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

914毫秒——多次调用cursor.execute 846毫秒——cursor.executemany 362毫秒- psycopg2.extras.execute_batch 346毫秒——execute_batch with page_size=1000 265毫秒——execute_batch带有预处理语句 161毫秒- psycopg2.extras.execute_values 127毫秒——游标。使用字符串连接的值执行 39毫秒- copy_expert一次生成整个CSV文件 32毫秒- copy_expert with CSVFile

另一种有效的方法是将rows作为1参数传递给insert, 也就是数组的json对象。

例如,你传递的论点:

[ {id: 18, score: 1}, { id: 19, score: 5} ]

它是一个数组,其中可以包含任意数量的对象。 然后你的SQL看起来像这样:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

注意:你的postgress必须足够新,才能支持json

几年来,我一直在使用ant32的答案。然而,我发现它在python 3中抛出了一个错误,因为mogrify返回一个字节字符串。

显式转换为bytse字符串是使代码与python 3兼容的简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)