Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reflected SQLAlchemy metadata in celery tasks?

For better testability and other reasons, it is good to have SQLAlchemy database sessions configuration non-global as described very well in the following question:

how to setup sqlalchemy session in celery tasks with no global variable (and also discussed in https://github.com/celery/celery/issues/3561 )

Now, the question is, how to handle metadata elegantly? If my understanding is correct, metadata can be had once, eg:

engine = create_engine(DB_URL, encoding='utf-8', pool_recycle=3600,
                       pool_size=10)
# db_session = get_session()  # this is old global session
meta = MetaData()
meta.reflect(bind=engine)

Reflecting on each task execution is not good for performance reason, metadata is more or less stable and thread-safe structure (if we only read it).

However, metadata sometimes changes (celery is not the "owner" of the db schema), causing errors in workers.

What could be an elegant way to deal with meta in a testable way, plus still be able to react to underlying db changes? (alembic in use, if it is relevant).

I was thinking of using alembic version change as a signal to re-reflect, but not quite sure how to make it work nicely in celery. For instance, if more than one worker will at once sense a change, the global meta may be treated in a non-thread safety way.

If it matters, celery use in the case is standalone, no web framework modules/apps/whatever present in the celery app. The problem is also simplified as only SQLAlchemy Core is in use, not object mapper.

like image 532
Roman Susi Avatar asked Sep 29 '18 17:09

Roman Susi


Video Answer


1 Answers

This is only partial solution, and it's for SQLAlchemy ORM (but I guess something similar is easy to implement for Core).

Main points:

  • engine is at module level, but config (access URL, parameters) is from os.environ
  • session is in it's own factory function
  • at module level: BaseModel = automap_base() and then table classes use that BaseModel as superclass and usually just one argument - __tablename__, but arbitrary relationships, attributes can be added there (very similar to normal ORM use)
  • at module level: BaseModel.prepare(ENGINE, reflect=True)

Tests (using pytest) inject environment variable (eg DB_URL) in conftest.py at module level.

One important moment: database_session is always initiated (that is, factory function called) in the task function, and propagated into all functions explicitly. This way allows to control units of work naturally, usually one transaction per task. This also simplifies testing, because all database-using functions can be provided with fake or real (test) database session.

"Task function" is the above is a function, which is called in the function, which is decorated by task - this way task function can be tested without task machinery.

This is only partial solution, because redoing reflection is not there. If task workers can be stopped for a moment (and database anyway experience downtime due to schema changes) as those usually are backgrounds tasks, so it does not pose a problem. Workers can also be restarted by some external watchdog, which can monitor database changes. This can be made convenient by using supervisord or some other way to control celery workers running in foreground.

All in all, after I solved the problem as described above I value "explicit is better than implicit" philosophy even more. All those magical "app"s, "request"s be it in celery or Flask, may bring minuscule abbreviations in the function signatures, but I'd rather passed some kind of context down the call chain for improved testability and better context understanding and management.

like image 106
Roman Susi Avatar answered Oct 15 '22 21:10

Roman Susi