我需要用一个查询插入多行(行数不是常量),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我唯一知道的办法就是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要更简单的方法。


当前回答

游标。copy_from是迄今为止我发现的用于批量插入的最快解决方案。下面是我做的一个要点,包含一个名为IteratorFile的类,它允许迭代器产生的字符串像文件一样读取。我们可以使用生成器表达式将每个输入记录转换为字符串。所以解是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这种微不足道的参数大小,它不会产生太大的速度差异,但当处理数千行以上时,我看到了很大的加速。它也比构建一个巨大的查询字符串更节省内存。迭代器一次只能在内存中保存一条输入记录,在某些时候,在Python进程或Postgres中构建查询字符串会耗尽内存。

其他回答

如果您正在使用SQLAlchemy,则不需要手工制作字符串,因为SQLAlchemy支持为单个INSERT语句生成多行VALUES子句:

rows = []
for i, name in enumerate(rawdata):
    row = {
        'id': i,
        'name': name,
        'valid': True,
    }
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

光标。由@ joseph提供的复制解决方案。Sheedy (https://stackoverflow.com/users/958118/joseph-sheedy)以上(https://stackoverflow.com/a/30721460/11100064)确实是闪电般快。

然而,他给出的例子不适用于具有任意数量字段的记录,我花了一些时间才弄清楚如何正确使用它。

IteratorFile需要用制表符分隔的字段实例化,就像这样(r是一个字典列表,其中每个字典都是一条记录):

    f = IteratorFile("{0}\t{1}\t{2}\t{3}\t{4}".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了泛化任意数量的字段,我们将首先创建一个具有正确数量的制表符和字段占位符的行字符串:"{}\t{}\t{}....\t{}",然后使用.format()为记录中的r填充字段值:*list(r.values())):

        line = "\t".join(["{}"] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

在主旨这里完成功能。

最后,在SQLalchemy1.2版本中,这个新实现被添加到使用psycopg2.extras.execute_batch()而不是executemany来初始化引擎时使用use_batch_mode=True,例如:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后有人将不得不使用SQLalchmey不会费心尝试sqla和psycopg2和直接SQL的不同组合在一起。

使用aiopg -下面的代码段工作得非常好

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)

我使用的解决方案可以在1毫秒内插入8000条记录

curtime = datetime.datetime.now()
postData = dict()
postData["title"] = "This is Title Text"
postData["body"] = "This a Body Text it Can be Long Text"
postData['created_at'] = curtime.isoformat()
postData['updated_at'] = curtime.isoformat()
data = []
for x in range(8000):
    data.append(((postData)))
vals = []
for d in postData:
    vals.append(tuple(d.values())) #Here we extract the Values from the Dict
flds = ",".join(map(str, postData[0]))
tableFlds =  ",".join(map(str, vals))
sqlStr = f"INSERT INTO posts ({flds}) VALUES {tableFlds}"
db.execute(sqlStr)
connection.commit()
rowsAffected = db.rowcount
print(f'{rowsAffected} Rows Affected')