我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

如果您正在使用SQLAlchemy,则不需要手工制作字符串,因为SQLAlchemy支持为单个INSERT语句生成多行VALUES子句:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

其他回答

所有这些技术在Postgres术语中都被称为“扩展插入”,截至2016年11月24日,它仍然比psychopg2的executemany()和这个线程中列出的所有其他方法快得多(在得到这个答案之前我尝试过)。

下面是一些不使用cur.mogrify的代码,很好,很简单:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但需要注意的是,如果可以使用copy_from(),则应该使用copy_from;)

我使用的解决方案可以在1毫秒内插入8000条记录

curtime = datetime.datetime.now()
postData = dict()
postData["title"] = "This is Title Text"
postData["body"] = "This a Body Text it Can be Long Text"
postData['created_at'] = curtime.isoformat()
postData['updated_at'] = curtime.isoformat()
data = []
for x in range(8000):
    data.append(((postData)))
vals = []
for d in postData:
    vals.append(tuple(d.values())) #Here we extract the Values from the Dict
flds = ",".join(map(str, postData[0]))
tableFlds =  ",".join(map(str, vals))
sqlStr = f"INSERT INTO posts ({flds}) VALUES {tableFlds}"
db.execute(sqlStr)
connection.commit()
rowsAffected = db.rowcount
print(f'{rowsAffected} Rows Affected')

Execute_batch在这个问题发布后已经添加到psycopg2。

它比execute_values快。

光标。由@ joseph提供的复制解决方案。Sheedy (https://stackoverflow.com/users/958118/joseph-sheedy)以上(https://stackoverflow.com/a/30721460/11100064)确实是闪电般快。

然而,他给出的例子不适用于具有任意数量字段的记录,我花了一些时间才弄清楚如何正确使用它。

IteratorFile需要用制表符分隔的字段实例化,就像这样(r是一个字典列表,其中每个字典都是一条记录):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了泛化任意数量的字段,我们将首先创建一个具有正确数量的制表符和字段占位符的行字符串:"{}\t{}\t{}....\t{}",然后使用.format()为记录中的r填充字段值:*list(r.values())):

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

在主旨这里完成功能。

我构建了一个程序,可以向位于另一个城市的服务器插入多行代码。

我发现使用这种方法比任何执行方法都快10倍。在我的例子中,tup是一个包含大约2000行的元组。使用这种方法大约需要10秒:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法时2分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)