I have a reactor that fetches messages from a RabbitMQ broker and triggers worker methods to process these messages in a process pool, something like this:
This is implemented using python asyncio
, loop.run_in_executor()
and concurrent.futures.ProcessPoolExecutor
.
Now I want to access the database in the worker methods using SQLAlchemy. Mostly the processing will be very straightforward and quick CRUD operations.
The reactor will process 10-50 messages per second in the beginning, so it is not acceptable to open a new database connection for every request. Rather I would like to maintain one persistent connection per process.
My questions are: How can I do this? Can I just store them in a global variable? Will the SQA connection pool handle this for me? How to clean up when the reactor stops?
[Update]
Why choosing this pattern with a process pool?
The current implementation uses a different pattern where each consumer runs in its own thread. Somehow this does not work very well. There are already about 200 consumers each running in their own thread, and the system is growing quickly. To scale better, the idea was to separate concerns and to consume messages in an I/O loop and delegate the processing to a pool. Of course, the performance of the whole system is mainly I/O bound. However, CPU is an issue when processing large result sets.
The other reason was "ease of use." While the connection handling and consumption of messages is implemented asynchronously, the code in the worker can be synchronous and simple.
Soon it became evident that accessing remote systems through persistent network connections from within the worker are an issue. This is what the CommunicationChannels are for: Inside the worker, I can grant requests to the message bus through these channels.
One of my current ideas is to handle DB access in a similar way: Pass statements through a queue to the event loop where they are sent to the DB. However, I have no idea how to do this with SQLAlchemy.
Where would be the entry point?
Objects need to be pickled
when they are passed through a queue. How do I get such an object from an SQA query?
The communication with the database has to work asynchronously in order not to block the event loop. Can I use e.g. aiomysql as a database driver for SQA?
connect() method returns a Connection object, and by using it in a Python context manager (e.g. the with: statement) the Connection. close() method is automatically invoked at the end of the block.
SQLAlchemy includes several connection pool implementations which integrate with the Engine . They can also be used directly for applications that want to add pooling to an otherwise plain DBAPI approach.
you call close(), as documented. dispose() is not needed and in fact calling dispose() explicitly is virtually never needed for normal SQLAlchemy usage.
Every pool implementation in SQLAlchemy is thread safe, including the default QueuePool . This means that 2 threads requesting a connection simultaneously will checkout 2 different connections. By extension, an engine will also be thread-safe.
Your requirement of one database connection per process-pool process can be easily satisfied if some care is taken on how you instantiate the session
, assuming you are working with the orm, in the worker processes.
A simple solution would be to have a global session which you reuse across requests:
# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine))
And on the worker task:
# task.py
from db import engine, DBSession
def task():
DBSession.begin() # each task will get its own transaction over the global connection
...
DBSession.query(...)
...
DBSession.close() # cleanup on task end
Arguments pool_size
and max_overflow
customize the default QueuePool used by create_engine.pool_size
will make sure your process only keeps 1 connection alive per process in the process pool.
If you want it to reconnect you can use DBSession.remove()
which will remove the session from the registry and will make it reconnect at the next DBSession usage. You can also use the recycle
argument of Pool to make the connection reconnect after the specified amount of time.
During development/debbuging you can use AssertionPool which will raise an exception if more than one connection is checked-out from the pool, see switching pool implementations on how to do that.
@roman: Nice challenge you have there.
I have being in a similar scenario before so here is my 2 cents: unless this consumer only "read" and "write" the message, without do any real proccessing of it, you could re-design this consumer as a consumer/producer that will consume the message, it will process the message and then will put result in another queue, that queue (processed messages for say) could be read by 1..N non-pooled asynchronous processes that would have open the DB connection in it's own entire life-cycle.
I can extend my answer, but I don't know if this approach fits for your needs, if so, I can give you more detail about the extended design.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With