I want to upload a huge number of entries (~600k) into a simple table in a PostgreSQL DB, with one foreign key, a timestamp and 3 float per each entry. However, it takes 60 ms per each entry to execute the core bulk insert described here, thus the whole execution would take 10 h. I have found out, that it is a performance issue of executemany()
method, however it has been solved with the execute_values()
method in psycopg2 2.7.
The code I run is the following:
#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
values) # around 600k dicts in a list
I see that it is a common problem, however I have not managed to find a solution in sqlalchemy itself. Is there any way to tell sqlalchemy to call execute_values()
in some occasions? Is there any other way to implement huge inserts without constructing the SQL statements by myself?
Thanks for the help!
The psycopg2 is over 2x faster than SQLAlchemy on small table. This behavior is expected as psycopg2 is a database driver for postgresql while SQLAlchemy is general ORM library.
This SQLAlchemy engine is a global object which can be created and configured once and use the same engine object multiple times for different operations. The first step in establishing a connection with the PostgreSQL database is creating an engine object using the create_engine() function of SQLAlchemy.
Meanwhile it became possible (from SqlAlchemy 1.2.0) with the use_batch_mode
flag on the create_engine()
function. See the docs. It uses the execute_batch()
function from psycopg.extras
.
Not the answer you are looking for in the sense that this does not address attempting to instruct SQLAlchemy to use the psycopg extras, and requires – sort of – manual SQL, but: you can access the underlying psycopg connections from an engine with raw_connection()
, which allows using COPY FROM:
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").\
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
and then
bulk_copy(engine, SimpleTable.__table__, values)
This should be plenty fast compared to executing INSERT statements. Moving 600,000 records on this machine took around 8 seconds, ~13µs/record. You could also use the raw connections and cursor with the extras package.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With