I have a Flask application with a RESTful API. One of the API calls is a 'mass upsert' call with a JSON payload. I am struggling with performance.
The first thing I tried was to use merge-result
on a Query
object, because...
This is an optimized method which will merge all mapped instances, preserving the structure of the result rows and unmapped columns with less method overhead than that of calling Session.merge() explicitly for each value.
This was the initial code:
class AdminApiUpdateTasks(Resource):
"""Bulk task creation / update endpoint"""
def put(self, slug):
taskdata = json.loads(request.data)
existing = db.session.query(Task).filter_by(challenge_slug=slug)
existing.merge_result(
[task_from_json(slug, **task) for task in taskdata])
db.session.commit()
return {}, 200
A request to that endpoint with ~5000 records, all of them already existing in the database, takes more than 11m to return:
real 11m36.459s
user 0m3.660s
sys 0m0.391s
As this would be a fairly typical use case, I started looking into alternatives to improve performance. Against my better judgement, I tried to merge
the session for each individual record:
class AdminApiUpdateTasks(Resource):
"""Bulk task creation / update endpoint"""
def put(self, slug):
# Get the posted data
taskdata = json.loads(request.data)
for task in taskdata:
db.session.merge(task_from_json(slug, **task))
db.session.commit()
return {}, 200
To my surprise, this turned out to be more than twice as fast:
real 4m33.945s
user 0m3.608s
sys 0m0.258s
I have two questions:
merge
faster than the supposedly optimized first one that uses merge_result
?SQLAlchemy is very, very fast. It's just that users tend to be unaware of just how much functionality is being delivered, and confuse an ORM result set with that of a raw database cursor.
SQLAlchemy is great because it provides a good connection / pooling infrastructure; a good Pythonic query building infrastructure; and then a good ORM infrastructure that is capable of complex queries and mappings (as well as some pretty stone-simple ones).
_sa_instance_state is a non-database-persisted value used by SQLAlchemy internally (it refers to the InstanceState for the instance. While not directly relevant to this section, if we want to get at it, we should use the inspect() function to access it).
This is an old question, but I hope this answer can still help people.
I used the same idea as this example set by SQLAlchemy, but I added benchmarking for doing UPSERT (insert if exists, otherwise update the existing record) operations. I added the results on a PostgreSQL 11 database below:
Tests to run: test_customer_individual_orm_select, test_customer_batched_orm_select, test_customer_batched_orm_select_add_all, test_customer_batched_orm_merge_result
test_customer_individual_orm_select : UPSERT statements via individual checks on whether objects exist and add new objects individually (10000 iterations); total time 9.359603 sec
test_customer_batched_orm_select : UPSERT statements via batched checks on whether objects exist and add new objects individually (10000 iterations); total time 1.553555 sec
test_customer_batched_orm_select_add_all : UPSERT statements via batched checks on whether objects exist and add new objects in bulk (10000 iterations); total time 1.358680 sec
test_customer_batched_orm_merge_result : UPSERT statements using batched merge_results (10000 iterations); total time 7.191284 sec
As you can see, merge-result is far from the most efficient option. I'd suggest checking in batches whether the results exist and should be updated. Hope this helps!
"""
This series of tests illustrates different ways to UPSERT
or INSERT ON CONFLICT UPDATE a large number of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler
Base = declarative_base()
engine = None
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
description = Column(String(255))
Profiler.init("bulk_upserts", num=100000)
@Profiler.setup
def setup_database(dburl, echo, num):
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
s = Session(engine)
for chunk in range(0, num, 10000):
# Insert half of the customers we want to merge
s.bulk_insert_mappings(
Customer,
[
{
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d" % i,
}
for i in range(chunk, chunk + 10000, 2)
],
)
s.commit()
@Profiler.profile
def test_customer_individual_orm_select(n):
"""
UPSERT statements via individual checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for i in range(0, n):
customer = session.query(Customer).get(i)
if customer:
customer.description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_select(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_select_add_all(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects in bulk
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
to_add = []
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
to_add.append({
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d new" % i,
})
if to_add:
session.bulk_insert_mappings(
Customer,
to_add
)
to_add = []
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_merge_result(n):
"UPSERT statements using batched merge_results"
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
customers.merge_result(
Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
) for i in range(chunk, chunk + 1000)
)
session.flush()
session.commit()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With