Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mutlithreading with raw PyMySQL for celery

In the project I am currently working on, I am not allowed to use an ORM so I made my own

It works great but I am having problems with Celery and it's concurrency. For a while, I had it set to 1 (using --concurrency=1) but I'm adding new tasks which take more time to process than they need to be run with celery beat, which causes a huge backlog of tasks.

When I set celery's concurrency to > 1, here's what happens (pastebin because it's big):

https://pastebin.com/M4HZXTDC

Any idea on how I could implement some kind of lock/wait on the other processes so that the different workers don't cross each other?

Edit: Here is where I setup my PyMySQL instance and how the open and close are handled

like image 503
juleslasne Avatar asked Oct 23 '20 15:10

juleslasne


1 Answers

PyMSQL does not allow threads to share the same connection (the module can be shared, but threads cannot share a connection). Your Model class is reusing the same connection everywhere.

So, when different workers call on the models to do queries, they are using the same connection object, causing conflicts.

Make sure your connection objects are thread-local. Instead of having a db class attribute, consider a method that will retrieve a thread-local connection object, instead of reusing one potentially created in a different thread.

For instance, create your connection in the task.

Right now, you're using a global connection everywhere for every model.

# Connect to the database
connection = pymysql.connect(**database_config)


class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    db = connection

To avoid this you can create the DB in the __init__ method instead...

class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    def __init__(self, *args, **kwargs):
        self.db = pymysql.connect(**database_config)

However, this may not be efficient/practical because every instance of the db object will create a session.

To improve upon this, you could use an approach using threading.local to keep connections local to threads.



class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """
    _conn = threading.local()
    @property
    def db(self):
        if not hasattr(self._conn, 'db'):
            self._conn.db = pymysql.connect(**database_config)
        return self._conn.db

Note, a thread-local solution works assuming you're using a threading concurrency model. Note also that celery uses multiple processes (prefork) by default. This may or may not be a problem. If it is a problem, you may be able to work around it if you change the workers to use eventlet instead.

like image 141
sytech Avatar answered Oct 08 '22 07:10

sytech