In the project I am currently working on, I am not allowed to use an ORM so I made my own
It works great but I am having problems with Celery and it's concurrency. For a while, I had it set to 1
(using --concurrency=1
) but I'm adding new tasks which take more time to process than they need to be run with celery beat, which causes a huge backlog of tasks.
When I set celery's concurrency to > 1, here's what happens (pastebin because it's big):
https://pastebin.com/M4HZXTDC
Any idea on how I could implement some kind of lock/wait on the other processes so that the different workers don't cross each other?
Edit: Here is where I setup my PyMySQL instance and how the open and close are handled
PyMSQL does not allow threads to share the same connection (the module can be shared, but threads cannot share a connection). Your Model class is reusing the same connection everywhere.
So, when different workers call on the models to do queries, they are using the same connection object, causing conflicts.
Make sure your connection objects are thread-local. Instead of having a db
class attribute, consider a method that will retrieve a thread-local connection object, instead of reusing one potentially created in a different thread.
For instance, create your connection in the task.
Right now, you're using a global connection everywhere for every model.
# Connect to the database
connection = pymysql.connect(**database_config)
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
db = connection
To avoid this you can create the DB in the __init__
method instead...
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
def __init__(self, *args, **kwargs):
self.db = pymysql.connect(**database_config)
However, this may not be efficient/practical because every instance of the db object will create a session.
To improve upon this, you could use an approach using threading.local
to keep connections local to threads.
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
_conn = threading.local()
@property
def db(self):
if not hasattr(self._conn, 'db'):
self._conn.db = pymysql.connect(**database_config)
return self._conn.db
Note, a thread-local solution works assuming you're using a threading concurrency model. Note also that celery uses multiple processes (prefork) by default. This may or may not be a problem. If it is a problem, you may be able to work around it if you change the workers to use eventlet instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With