I have a series of related tables maintained by Python/SQLAlchemy. If I delete a row in a particular table, I want the ability to be able to reverse that delete at some point in the future, in case of a mistake. I could do this using an is_deleted column and filter on that, but that becomes a pain when I'm querying other tables for related data. I could add an is_deleted column to all the other tables, and when a row from the main table is deleted, toggle them all. But then for every query on all the tables I'd have to filter on is_deleted. It can be done, but I'm hoping there's a better strategy.
One thought would be to move all the deleted data to another set of tables, that only store deleted data. But it's unclear to me if SQLAlchemy allows me to switch the table a particular object is associated with. I think this would be the preferred solution, but I don't know if it can be done.
Another thought is that I could run a second database, and copy the deleted data over. But that adds a layer of administration complexity that I'd like to avoid.
Any thoughts would be appreciated.
Mappers support the concept of configurable cascade behavior on relationship() constructs. This refers to how operations performed on a “parent” object relative to a particular Session should be propagated to items referred to by that relationship (e.g. “child” objects), and is affected by the relationship.
In Flask-SQLAlchemy, the backref parameter in relationship method allows you to declare a new property under a specified class as seen in the example in their docs: class Person(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(50)) addresses = db.relationship('Address', backref='person ...
close() method is automatically invoked at the end of the block. The Connection , is a proxy object for an actual DBAPI connection. The DBAPI connection is retrieved from the connection pool at the point at which Connection is created.
The “autocommit” feature is only in effect when no Transaction has otherwise been declared. This means the feature is not generally used with the ORM, as the Session object by default always maintains an ongoing Transaction .
A lot of people do the "is_deleted" thing, and I agree I'm not a fan of that either, though we do have a recipe for that at PreFilteredQuery.
What you're looking for, as someone else suggested, is a "versioning" recipe. We have a comprehensive example of storing copies of data in a separate versioned table introduced at Versioned Objects in the SQLAlchemy documentation.
Here, I've adapted some of the techniques used in that example to produce a more direct recipe that specifically only tracks "deleted" objects, and includes a "restore" feature which will "restore" a given row back to the main table. So it's not as much "SQLAlchemy allows me to switch the table a particular object is associated with", it's more like another mapped class is created, which resembles the primary one, which can also be used to "reverse" the deletion as you request. Everything below the line regarding __main__
is a proof-of-concept.
from sqlalchemy.orm import Session, object_session
from sqlalchemy import event
def preserve_deleted(class_):
def copy_col(col):
newcol = col.copy()
newcol.constraints = set()
return newcol
keys = class_.__table__.c.keys()
cols = dict(
(col.key, copy_col(col)) for col in class_.__table__.c
)
cols['__tablename__'] = "%s_deleted" % class_.__table__.name
class History(object):
def restore(self):
sess = object_session(self)
sess.delete(self)
sess.add(copy_inst(self, class_))
hist_class = type(
'%sDeleted' % class_.__name__,
(History, Base),
cols)
def copy_inst(fromobj, tocls):
return tocls(**dict(
(key, getattr(fromobj, key))
for key in keys
))
@event.listens_for(Session, 'before_flush')
def check_deleted(session, flush_context, instances):
for del_ in session.deleted:
if isinstance(del_, class_):
h = copy_inst(del_, hist_class)
session.add(h)
class_.deleted = hist_class
return class_
if __name__ == '__main__':
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship, Session
from sqlalchemy import create_engine
Base = declarative_base()
@preserve_deleted
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data1 = Column(String)
data2 = Column(String)
@preserve_deleted
class B(Base):
__tablename__ = 'b'
id = Column(Integer, primary_key=True)
data1 = Column(String)
a_id = Column(Integer, ForeignKey('a.id'))
a = relationship("A")
e = create_engine('sqlite://', echo=True)
Base.metadata.create_all(e)
s = Session(e)
a1, a2, a3, a4 = \
A(data1='a1d1', data2='a1d2'),\
A(data1='a2d1', data2='a2d2'),\
A(data1='a3d1', data2='a3d2'),\
A(data1='a4d1', data2='a4d2')
b1, b2, b3, b4 = \
B(data1='b1', a=a1),\
B(data1='b2', a=a1),\
B(data1='b3', a=a3),\
B(data1='b4', a=a4)
s.add_all([
a1, a2, a3, a4,
b1, b2, b3, b4
])
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (2, ), (3, ), (4, )]
s.delete(a2)
s.delete(b2)
s.delete(b3)
s.delete(a3)
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (4, )]
a2_deleted = s.query(A.deleted).filter(A.deleted.id == 2).one()
a2_deleted.restore()
b3_deleted = s.query(B.deleted).filter(B.deleted.id == 3).one()
a3_deleted = s.query(A.deleted).filter(A.deleted.id == 3).one()
b3_deleted.restore()
a3_deleted.restore()
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (3, ), (4, )]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With