I am working on finding a way in SQLAlchemy events to call an external API upon an attribute gets updated and persisted into the database. Here is my context:
An
User
model with an attribute namedbirthday
. When an instance ofUser
model gets updated and saved, I want to call to an external API to update this user's birthday accordingly.
I've tried Attribute Events, however, it generates too many hits and there is no way to guarantee that the set
/remove
attribute event would get persisted eventually (auto commit is set to False and transaction gets rolled back when errors occurred.)
Session Events would not work either because it requires a Session/SessionFactory as a parameter and there are just so many places in the code based that sessions have been used.
I have been looking at all the possible SQLAlchemy ORM event hooks in the official documentation but I couldn't find any one of them satisfy my requirement.
I wonder if anyone else has any insight into how to implement this kind of combination event trigger in SQLAlchemy. Thanks.
Deprecated since version 0.7: As of SQLAlchemy 0.7, the new event system described in Events replaces the extension/proxy/listener system, providing a consistent interface to all events without the need for subclassing.
_sa_instance_state is a non-database-persisted value used by SQLAlchemy internally (it refers to the InstanceState for the instance.
SQL alias is a method of giving a temporary name for a table that is more convenient and readable. SQL alias facilitates a simple name to be used in place of a complex table name when it has to be used multiple times in a query. The alias() function in sqlalchemy. sql module represents an SQL alias.
Define an extension to the sqlalchemy. ext. declarative system which automatically generates mapped classes and relationships from a database schema, typically though not necessarily one which is reflected.
You can do this by combining multiple events. The specific events you need to use depend on your particular application, but the basic idea is this:
InstanceEvents.load
] when an instance is loaded, note down the fact that it was loaded and not added to the session later (we only want to save the initial state if the instance was loaded)AttributeEvents.set/append/remove
] when an attribute changes, note down the fact that it was changed, and, if necessary, what it was changed from (these first two steps are optional if you don't need the initial state)SessionEvents.before_flush
] when a flush happens, note down which instances are actually being savedSessionEvents.before_commit
] before a commit completes, note down the current state of the instance (because you may not have access to it anymore after the commit)SessionEvents.after_commit
] after a commit completes, fire off the custom event handler and clear the instances that you savedAn interesting challenge is the ordering of the events. If you do a session.commit()
without doing a session.flush()
, you'll notice that the before_commit
event fires before the before_flush
event, which is different from the scenario where you do a session.flush()
before session.commit()
. The solution is to call session.flush()
in your before_commit
call to force the ordering. This is probably not 100% kosher, but it works for me in production.
Here's a (simple) diagram of the ordering of events:
begin
load
(save initial state)
set attribute
...
flush
set attribute
...
flush
...
(save modified state)
commit
(fire off "object saved and changed" event)
from itertools import chain
from weakref import WeakKeyDictionary, WeakSet
from sqlalchemy import Column, String, Integer, create_engine
from sqlalchemy import event
from sqlalchemy.orm import sessionmaker, object_session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine("sqlite://")
Session = sessionmaker(bind=engine)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
birthday = Column(String)
@event.listens_for(User.birthday, "set", active_history=True)
def _record_initial_state(target, value, old, initiator):
session = object_session(target)
if session is None:
return
if target not in session.info.get("loaded_instances", set()):
return
initial_state = session.info.setdefault("initial_state", WeakKeyDictionary())
# this is where you save the entire object's state, not necessarily just the birthday attribute
initial_state.setdefault(target, old)
@event.listens_for(User, "load")
def _record_loaded_instances_on_load(target, context):
session = object_session(target)
loaded_instances = session.info.setdefault("loaded_instances", WeakSet())
loaded_instances.add(target)
@event.listens_for(Session, "before_flush")
def track_instances_before_flush(session, context, instances):
modified_instances = session.info.setdefault("modified_instances", WeakSet())
for obj in chain(session.new, session.dirty):
if session.is_modified(obj) and isinstance(obj, User):
modified_instances.add(obj)
@event.listens_for(Session, "before_commit")
def set_pending_changes_before_commit(session):
session.flush() # IMPORTANT
initial_state = session.info.get("initial_state", {})
modified_instances = session.info.get("modified_instances", set())
del session.info["modified_instances"]
pending_changes = session.info["pending_changes"] = []
for obj in modified_instances:
initial = initial_state.get(obj)
current = obj.birthday
pending_changes.append({
"initial": initial,
"current": current,
})
initial_state[obj] = current
@event.listens_for(Session, "after_commit")
def after_commit(session):
pending_changes = session.info.get("pending_changes", {})
del session.info["pending_changes"]
for changes in pending_changes:
print(changes) # this is where you would fire your custom event
loaded_instances = session.info["loaded_instances"] = WeakSet()
for v in session.identity_map.values():
if isinstance(v, User):
loaded_instances.add(v)
def main():
engine = create_engine("sqlite://", echo=False)
Base.metadata.create_all(bind=engine)
session = Session(bind=engine)
user = User(birthday="foo")
session.add(user)
user.birthday = "bar"
session.flush()
user.birthday = "baz"
session.commit() # prints: {"initial": None, "current": "baz"}
user.birthday = "foobar"
session.commit() # prints: {"initial": "baz", "current": "foobar"}
session.close()
if __name__ == "__main__":
main()
As you can see, it's a little complicated and not very ergonomic. It would be nicer if it were integrated into the ORM, but I also understand there may be reasons for not doing so.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With