Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Looking for a better strategy for an SQLAlchemy bulk upsert

I have a Flask application with a RESTful API. One of the API calls is a 'mass upsert' call with a JSON payload. I am struggling with performance.

The first thing I tried was to use merge-result on a Query object, because...

This is an optimized method which will merge all mapped instances, preserving the structure of the result rows and unmapped columns with less method overhead than that of calling Session.merge() explicitly for each value.

This was the initial code:

class AdminApiUpdateTasks(Resource):

    """Bulk task creation / update endpoint"""

    def put(self, slug):
        taskdata = json.loads(request.data)
        existing = db.session.query(Task).filter_by(challenge_slug=slug)
        existing.merge_result(
            [task_from_json(slug, **task) for task in taskdata])
        db.session.commit()
        return {}, 200

A request to that endpoint with ~5000 records, all of them already existing in the database, takes more than 11m to return:

real    11m36.459s
user    0m3.660s
sys 0m0.391s

As this would be a fairly typical use case, I started looking into alternatives to improve performance. Against my better judgement, I tried to merge the session for each individual record:

class AdminApiUpdateTasks(Resource):

    """Bulk task creation / update endpoint"""

    def put(self, slug):
        # Get the posted data
        taskdata = json.loads(request.data)
        for task in taskdata:
           db.session.merge(task_from_json(slug, **task))
        db.session.commit()
        return {}, 200

To my surprise, this turned out to be more than twice as fast:

real    4m33.945s
user    0m3.608s
sys 0m0.258s

I have two questions:

  1. Why is the second strategy using merge faster than the supposedly optimized first one that uses merge_result?
  2. What other strategies should I pursue to optimize this more, if any?
like image 303
mvexel Avatar asked May 26 '14 21:05

mvexel


People also ask

Is SQLAlchemy fast?

SQLAlchemy is very, very fast. It's just that users tend to be unaware of just how much functionality is being delivered, and confuse an ORM result set with that of a raw database cursor.

Is SQLAlchemy good?

SQLAlchemy is great because it provides a good connection / pooling infrastructure; a good Pythonic query building infrastructure; and then a good ORM infrastructure that is capable of complex queries and mappings (as well as some pretty stone-simple ones).

What is _sa_instance_state SQLAlchemy?

_sa_instance_state is a non-database-persisted value used by SQLAlchemy internally (it refers to the InstanceState for the instance. While not directly relevant to this section, if we want to get at it, we should use the inspect() function to access it).


1 Answers

This is an old question, but I hope this answer can still help people.

I used the same idea as this example set by SQLAlchemy, but I added benchmarking for doing UPSERT (insert if exists, otherwise update the existing record) operations. I added the results on a PostgreSQL 11 database below:

Tests to run: test_customer_individual_orm_select, test_customer_batched_orm_select, test_customer_batched_orm_select_add_all, test_customer_batched_orm_merge_result
test_customer_individual_orm_select : UPSERT statements via individual checks on whether objects exist and add new objects individually (10000 iterations); total time 9.359603 sec
test_customer_batched_orm_select : UPSERT statements via batched checks on whether objects exist and add new objects individually (10000 iterations); total time 1.553555 sec
test_customer_batched_orm_select_add_all : UPSERT statements via batched checks on whether objects exist and add new objects in bulk (10000 iterations); total time 1.358680 sec
test_customer_batched_orm_merge_result : UPSERT statements using batched merge_results (10000 iterations); total time 7.191284 sec

As you can see, merge-result is far from the most efficient option. I'd suggest checking in batches whether the results exist and should be updated. Hope this helps!

"""
This series of tests illustrates different ways to UPSERT
or INSERT ON CONFLICT UPDATE a large number of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler


Base = declarative_base()
engine = None


class Customer(Base):
  __tablename__ = "customer"
  id = Column(Integer, primary_key=True)
  name = Column(String(255))
  description = Column(String(255))


Profiler.init("bulk_upserts", num=100000)


@Profiler.setup
def setup_database(dburl, echo, num):
  global engine
  engine = create_engine(dburl, echo=echo)
  Base.metadata.drop_all(engine)
  Base.metadata.create_all(engine)

  s = Session(engine)
  for chunk in range(0, num, 10000):
    # Insert half of the customers we want to merge
    s.bulk_insert_mappings(
      Customer,
      [
        {
          "id": i,
          "name": "customer name %d" % i,
          "description": "customer description %d" % i,
        }
        for i in range(chunk, chunk + 10000, 2)
      ],
    )
  s.commit()


@Profiler.profile
def test_customer_individual_orm_select(n):
  """
  UPSERT statements via individual checks on whether objects exist
  and add new objects individually
  """
  session = Session(bind=engine)
  for i in range(0, n):
    customer = session.query(Customer).get(i)
    if customer:
      customer.description += "updated"
    else:
      session.add(Customer(
          id=i,
          name=f"customer name {i}",
          description=f"customer description {i} new"
      ))
    session.flush()
  session.commit()

@Profiler.profile
def test_customer_batched_orm_select(n):
  """
  UPSERT statements via batched checks on whether objects exist
  and add new objects individually
  """
  session = Session(bind=engine)
  for chunk in range(0, n, 1000):
    customers = {
        c.id: c for c in
        session.query(Customer)\
            .filter(Customer.id.between(chunk, chunk + 1000))
    }
    for i in range(chunk, chunk + 1000):
      if i in customers:
        customers[i].description += "updated"
      else:
        session.add(Customer(
            id=i,
            name=f"customer name {i}",
            description=f"customer description {i} new"
        ))
    session.flush()
  session.commit()

@Profiler.profile
def test_customer_batched_orm_select_add_all(n):
  """
  UPSERT statements via batched checks on whether objects exist
  and add new objects in bulk
  """
  session = Session(bind=engine)
  for chunk in range(0, n, 1000):
    customers = {
        c.id: c for c in
        session.query(Customer)\
            .filter(Customer.id.between(chunk, chunk + 1000))
    }
    to_add = []
    for i in range(chunk, chunk + 1000):
      if i in customers:
        customers[i].description += "updated"
      else:
        to_add.append({
            "id": i,
            "name": "customer name %d" % i,
            "description": "customer description %d new" % i,
        })
    if to_add:
      session.bulk_insert_mappings(
        Customer,
        to_add
      )
      to_add = []
    session.flush()
  session.commit()

@Profiler.profile
def test_customer_batched_orm_merge_result(n):
  "UPSERT statements using batched merge_results"
  session = Session(bind=engine)
  for chunk in range(0, n, 1000):
    customers = session.query(Customer)\
        .filter(Customer.id.between(chunk, chunk + 1000))
    customers.merge_result(
      Customer(
          id=i,
          name=f"customer name {i}",
          description=f"customer description {i} new"
      ) for i in range(chunk, chunk + 1000)
    )
    session.flush()
  session.commit()
like image 189
Ruben Helsloot Avatar answered Nov 15 '22 14:11

Ruben Helsloot