我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

安全漏洞

截至2022-11-16,@Clodoaldo Neto (Psycopg 2.6), @Joseph Sheedy, @J。J, @Bart Jonk, @kevo Njoki, @TKoutny和@Nihal Sharma包含SQL注入漏洞,不应使用。

目前为止最快的建议(copy_from)也不应该使用,因为它很难正确地转义数据。当尝试插入',",\n, \, \t或\n这样的字符时,这很容易看出。

psycopg2的作者也建议不要使用copy_from:

Copy_from()和copy_to()实际上只是古老且不完整的方法

最快的方法

最快的方法是游标。copy_expert,它可以直接从CSV文件插入数据。

with open("mydata.csv") as f:
    cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

copy_expert也是动态生成CSV文件时最快的方法。作为参考,请参阅下面的CSVFile类,该类注意限制内存使用。

import io, csv

class CSVFile(io.TextIOBase):
    # Create a CSV file from rows. Can only be read once.
    def __init__(self, rows, size=8192):
        self.row_iter = iter(rows)
        self.buf = io.StringIO()
        self.available = 0
        self.size = size

    def read(self, n):
        # Buffer new CSV rows until enough data is available
        buf = self.buf
        writer = csv.writer(buf)
        while self.available < n:
            try:
                row_length = writer.writerow(next(self.row_iter))
                self.available += row_length
                self.size = max(self.size, row_length)
            except StopIteration:
                break

        # Read requested amount of data from buffer
        write_pos = buf.tell()
        read_pos = write_pos - self.available
        buf.seek(read_pos)
        data = buf.read(n)
        self.available -= len(data)

        # Shrink buffer if it grew very large
        if read_pos > 2 * self.size:
            remaining = buf.read()
            buf.seek(0)
            buf.write(remaining)
            buf.truncate()
        else:
            buf.seek(write_pos)

        return data

这个类可以这样使用:

rows = [(1, "a", "b"), (2, "c", "d")]
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", CSVFile(rows))

如果所有数据都适合内存,您也可以直接生成整个CSV数据,而不需要使用CSVFile类,但是如果您不知道将来要插入多少数据,则可能不应该这样做。

f = io.StringIO()
writer = csv.writer(f)
for row in rows:
    writer.writerow(row)
f.seek(0)
cursor.copy_expert("COPY mytable (my_id, a, b) FROM STDIN WITH csv", f)

基准测试结果

914毫秒——多次调用cursor.execute 846毫秒——cursor.executemany 362毫秒- psycopg2.extras.execute_batch 346毫秒——execute_batch with page_size=1000 265毫秒——execute_batch带有预处理语句 161毫秒- psycopg2.extras.execute_values 127毫秒——游标。使用字符串连接的值执行 39毫秒- copy_expert一次生成整个CSV文件 32毫秒- copy_expert with CSVFile

其他回答

我构建了一个程序,可以向位于另一个城市的服务器插入多行代码。

我发现使用这种方法比任何执行方法都快10倍。在我的例子中,tup是一个包含大约2000行的元组。使用这种方法大约需要10秒:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法时2分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

最后,在SQLalchemy1.2版本中,这个新实现被添加到使用psycopg2.extras.execute_batch()而不是executemany来初始化引擎时使用use_batch_mode=True,例如:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后有人将不得不使用SQLalchmey不会费心尝试sqla和psycopg2和直接SQL的不同组合在一起。

如果你想在一个insert语句中插入多行(假设你没有使用ORM),到目前为止对我来说最简单的方法是使用字典列表。这里有一个例子:

 t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6},
      {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7},
      {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

正如你所看到的,只会执行一个查询:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}]
INFO sqlalchemy.engine.base.Engine COMMIT

从@ant32

def myInsertManyTuples(connection, table, tuple_of_tuples):
    cursor = connection.cursor()
    try:
        insert_len = len(tuple_of_tuples[0])
        insert_template = "("
        for i in range(insert_len):
            insert_template += "%s,"
        insert_template = insert_template[:-1] + ")"

        args_str = ",".join(
            cursor.mogrify(insert_template, x).decode("utf-8")
            for x in tuple_of_tuples
        )
        cursor.execute("INSERT INTO " + table + " VALUES " + args_str)
        connection.commit()

    except psycopg2.Error as e:
        print(f"psycopg2.Error in myInsertMany = {e}")
        connection.rollback()

几年来,我一直在使用ant32的答案。然而,我发现它在python 3中抛出了一个错误,因为mogrify返回一个字节字符串。

显式转换为bytse字符串是使代码与python 3兼容的简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)