Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can't pickle psycopg2.extensions.connection objects when using pool.imap, but can be done in individual processes

I am trying to build an application which will "check out" a cell, which is a square covering a part of land in a geographic database, and perform an analysis of the features within that cell. Since I have many cells to process, I am using a multiprocessing approach.

I had it somewhat working inside of my object like this:

class DistributedGeographicConstraintProcessor:

    ...

    def _process_cell(self, conn_string):

        conn = pg2.connect(conn_string)
        try:
            cur = conn.cursor()

            cell_id = self._check_out_cell(cur)
            conn.commit()
            print(f"processing cell_id {cell_id}...")

            for constraint in self.constraints:
                # print(f"processing {constraint.name()}...")
                query = constraint.prepare_distributed_query(self.job, self.grid)
                cur.execute(query, {
                    "buffer": constraint.buffer(),
                    "cell_id": cell_id,
                    "name": constraint.name(),
                    "simplify_tolerance": constraint.simplify_tolerance()
                })

            # TODO: do a final race condition check to further suppress duplicates
            self._check_in_cell(cur, cell_id)
            conn.commit()

        finally:
            del cur
            conn.close()

        return None

    def run(self):

        while True:
            if not self._job_finished():
                params = [self.conn_string] * self.num_cores
                processes = []
                for param in params:
                    process = mp.Process(target=self._process_cell, args=(param,))
                    processes.append(process)
                    sleep(0.1)  # Prevent multiple processes from checkout out the same grid square
                    process.start()
                for process in processes:
                    process.join()
            else:
                self._finalize_job()
                break

But the problem is that it will only start four processes and wait until they all finish before starting four new processes.

I want to make it so when one process finishes its work, it will begin working on the next cell immediately, even if its co-processes are not yet finished.

I am unsure about how to implement this and I have tried using a pool like this:

def run(self):

    pool = mp.Pool(self.num_cores)
    unprocessed_cells = self._unprocessed_cells()
    for i in pool.imap(self._process_cell, unprocessed_cells):
        print(i)

But this just tells me that the connection is not able to be pickled:

TypeError: can't pickle psycopg2.extensions.connection objects

But I do not understand why, because it is the exact same function that I am using in the imap function as in the Process target.

I have already looked at these threads, here is why they do not answer my question:

  • Error Connecting To PostgreSQL can't pickle psycopg2.extensions.connection objects - The answer here only indicates that multiple processes cannot share the same connection. I am aware of this, and am initializing the process inside the function which is being executed in the child process. Also, as I mentioned, it works when I map the function to individual Process instances, with the same function with the same inputs.
  • Multiprocessing result of a psycopg2 request. “Can't pickle psycopg2.extensions.connection objects” - There is no answer nor any comments on this question, and the code is not intact anyway - the author makes reference to a function that does not specified in the question, and in any case it is obvious that they are blatantly trying to share the same cursor between processes.
like image 958
wfgeo Avatar asked Oct 15 '25 15:10

wfgeo


1 Answers

My guess is that you're attaching some connection object to self; try to rewrite your solution using functions only (no classes/methods).

Here is a simplified version of a single producer/multiple workers solution I used some time ago:

def worker(param):
    //connect to pg
    //do work


def main():
    pool = Pool(processes=NUM_PROC)
    tasks = []
    for param in params:
        t = pool.apply_async(utils.process_month, args=(param, ))
        tasks.append(t)
    pool.close()
    finished = false
    while not finished:     
        finished = True
        for t in tasks:
            if not t.ready():
                finished = False
                break
        time.sleep(1)
like image 155
Ionut Ticus Avatar answered Oct 19 '25 12:10

Ionut Ticus