Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQLAlchemy - Multithreaded Persistent Object Creation, how to merge back into single session to avoid state conflict?

I have tens (potentially hundreds) of thousands of persistent objects that I want to generate in a multithreaded fashion due the processing required.

While the creation of the objects happens in separate threads (using Flask-SQLAlchemy extension btw with scoped sessions) the call to write the generated objects to the DB happens in 1 place after the generation has completed.

The problem, I believe, is that the objects being created are part of several existing relationships-- thereby triggering the automatic addition to the identity map despite being created in separate, concurrent, threads with no explicit session in any of the threads.

I was hoping to contain the generated objects in a single list, and then write the whole list (using a single session object) to the database. This results in an error like this:

AssertionError: A conflicting state is already present in the identity map for key (<class 'app.ModelObject'>, (1L,))

Hence why I believe the identity map has already been populated, because it's when I try to add and commit using the global session outside of the concurrent code, the assertion error is triggered.

The final detail is that whatever session object(s), (scoped or otherwise, as I don't fully understand how automatic addition to the identity map works in the case of multithreading) I cannot find a way / don't know how to get a reference to them so that even if I wanted to deal with a separate session per process I could.

Any advice is greatly appreciated. The only reason I am not posting code (yet) is because it's difficult to abstract a working example immediately out of my app. I will post if somebody really needs to see it though.

like image 893
Peter M. Elias Avatar asked Jun 26 '13 02:06

Peter M. Elias


People also ask

What is a session in SQLAlchemy?

What are SQLAlchemy Sessions? What does the Session do? One of the core concepts in SQLAlchemy is the Session. A Session establishes and maintains all conversations between your program and the databases. It represents an intermediary zone for all the Python model objects you have loaded in it.

Is SQLAlchemy DB session thread safe?

SQLAlchemy DB session is not thread safe. In this post, I will show you 2 ways to use it in a multithreading context. Below is an example given by the official doc to show how to use the contextmanager to construct, commit and close a SQLAlchemy session. Suppose we have a function called f1 which does something with the session.

How do I use contextmanager to construct a SQLAlchemy session?

Below is an example given by the official doc to show how to use the contextmanager to construct, commit and close a SQLAlchemy session. Suppose we have a function called f1 which does something with the session. And we need to call f1 in a multithreading context. All we need to do is to add the session_scope () around the f1:

What is scoped_session in SQLAlchemy?

The scoped_session object is a very popular and useful object used by many SQLAlchemy applications. However, it is important to note that it presents only one approach to the issue of Session management.


1 Answers

Each session is thread-local; in other words there is a separate session for each thread. If you decide to pass some instances to another thread, they will become "detached" from the session. Use db.session.add_all(objects) in the receiving thread to put them all back.

For some reason, it looks like you're creating objects with the same identity (primary key columns) in different threads, then trying to send them both to the database. One option is to fix why this is happening, so that identities will be guaranteed unique. You may also try merging; merged_object = db.session.merge(other_object, load=False).

Edit: zzzeek's comment clued me in on something else that may be going on:

With Flask-SQLAlchemy, the session is tied to the app context. Since that is thread local, spawning a new thread will invalidate the context; there will be no database session in the threads. All the instances are detached there, and cannot properly track relationships. One solution is to pass app to each thread and perform everything within a with app.app_context(): block. Inside the block, first use db.session.add to populate the local session with the passed instances. You should still merge in the master task afterwards to ensure consistency.

like image 54
davidism Avatar answered Nov 14 '22 22:11

davidism