Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle SQLAlchemy Connections in ProcessPool?

I have a reactor that fetches messages from a RabbitMQ broker and triggers worker methods to process these messages in a process pool, something like this:

Reactor

This is implemented using python asyncio, loop.run_in_executor() and concurrent.futures.ProcessPoolExecutor.

Now I want to access the database in the worker methods using SQLAlchemy. Mostly the processing will be very straightforward and quick CRUD operations.

The reactor will process 10-50 messages per second in the beginning, so it is not acceptable to open a new database connection for every request. Rather I would like to maintain one persistent connection per process.

My questions are: How can I do this? Can I just store them in a global variable? Will the SQA connection pool handle this for me? How to clean up when the reactor stops?

[Update]

  • The database is MySQL with InnoDB.

Why choosing this pattern with a process pool?

The current implementation uses a different pattern where each consumer runs in its own thread. Somehow this does not work very well. There are already about 200 consumers each running in their own thread, and the system is growing quickly. To scale better, the idea was to separate concerns and to consume messages in an I/O loop and delegate the processing to a pool. Of course, the performance of the whole system is mainly I/O bound. However, CPU is an issue when processing large result sets.

The other reason was "ease of use." While the connection handling and consumption of messages is implemented asynchronously, the code in the worker can be synchronous and simple.

Soon it became evident that accessing remote systems through persistent network connections from within the worker are an issue. This is what the CommunicationChannels are for: Inside the worker, I can grant requests to the message bus through these channels.

One of my current ideas is to handle DB access in a similar way: Pass statements through a queue to the event loop where they are sent to the DB. However, I have no idea how to do this with SQLAlchemy. Where would be the entry point? Objects need to be pickled when they are passed through a queue. How do I get such an object from an SQA query? The communication with the database has to work asynchronously in order not to block the event loop. Can I use e.g. aiomysql as a database driver for SQA?

like image 916
roman Avatar asked Sep 21 '16 10:09

roman


People also ask

Does SQLAlchemy close connection automatically?

connect() method returns a Connection object, and by using it in a Python context manager (e.g. the with: statement) the Connection. close() method is automatically invoked at the end of the block.

Does SQLAlchemy use connection pool?

SQLAlchemy includes several connection pool implementations which integrate with the Engine . They can also be used directly for applications that want to add pooling to an otherwise plain DBAPI approach.

How do I close SQLAlchemy connection in python?

you call close(), as documented. dispose() is not needed and in fact calling dispose() explicitly is virtually never needed for normal SQLAlchemy usage.

Is SQLAlchemy engine thread safe?

Every pool implementation in SQLAlchemy is thread safe, including the default QueuePool . This means that 2 threads requesting a connection simultaneously will checkout 2 different connections. By extension, an engine will also be thread-safe.


2 Answers

Your requirement of one database connection per process-pool process can be easily satisfied if some care is taken on how you instantiate the session, assuming you are working with the orm, in the worker processes.

A simple solution would be to have a global session which you reuse across requests:

# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine)) 

And on the worker task:

# task.py
from db import engine, DBSession
def task():
    DBSession.begin() # each task will get its own transaction over the global connection
    ...
    DBSession.query(...)
    ...
    DBSession.close() # cleanup on task end

Arguments pool_size and max_overflow customize the default QueuePool used by create_engine.pool_size will make sure your process only keeps 1 connection alive per process in the process pool.

If you want it to reconnect you can use DBSession.remove() which will remove the session from the registry and will make it reconnect at the next DBSession usage. You can also use the recycle argument of Pool to make the connection reconnect after the specified amount of time.

During development/debbuging you can use AssertionPool which will raise an exception if more than one connection is checked-out from the pool, see switching pool implementations on how to do that.

like image 116
olokki Avatar answered Oct 16 '22 11:10

olokki


@roman: Nice challenge you have there.

I have being in a similar scenario before so here is my 2 cents: unless this consumer only "read" and "write" the message, without do any real proccessing of it, you could re-design this consumer as a consumer/producer that will consume the message, it will process the message and then will put result in another queue, that queue (processed messages for say) could be read by 1..N non-pooled asynchronous processes that would have open the DB connection in it's own entire life-cycle.

I can extend my answer, but I don't know if this approach fits for your needs, if so, I can give you more detail about the extended design.

like image 1
Rafael Aguilar Avatar answered Oct 16 '22 12:10

Rafael Aguilar