Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create DB connection and maintain on multiple processes (multiprocessing)

Similar to another post I made, this answers that post and creates a new question.

Recap: I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate it to the polygon feature that it lies within. So if my point 'New York City' lies within polygon USA and for the USA polygon 'GID = 1' I will assign 'gid_fkey = 1' for my point New York City.

Okay so this has been achieved using multiprocessing. I have noticed a 150% increase in speed using this so it does work. But I think there is a bunch of unecessary overhead as one DB connection is required for each record.

So here is the code:

import multiprocessing, time, psycopg2  class Consumer(multiprocessing.Process):      def __init__(self, task_queue, result_queue):         multiprocessing.Process.__init__(self)         self.task_queue = task_queue         self.result_queue = result_queue      def run(self):         proc_name = self.name         while True:             next_task = self.task_queue.get()             if next_task is None:                 print 'Tasks Complete'                 self.task_queue.task_done()                 break                         answer = next_task()             self.task_queue.task_done()             self.result_queue.put(answer)         return   class Task(object):     def __init__(self, a):         self.a = a      def __call__(self):                 pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")         pyConn.set_isolation_level(0)         pyCursor1 = pyConn.cursor()          procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)          pyCursor1.execute(procQuery)         print 'What is self?'         print self.a          return self.a      def __str__(self):         return 'ARC'     def run(self):         print 'IN'  if __name__ == '__main__':     tasks = multiprocessing.JoinableQueue()     results = multiprocessing.Queue()      num_consumers = multiprocessing.cpu_count() * 2     consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]     for w in consumers:         w.start()      pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")     pyConnX.set_isolation_level(0)     pyCursorX = pyConnX.cursor()      pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')         temp = pyCursorX.fetchall()         num_job = temp[0]     num_jobs = num_job[0]      pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')         cityIdListTuple = pyCursorX.fetchall()          cityIdListList = []      for x in cityIdListTuple:         cityIdList.append(x[0])       for i in xrange(num_jobs):         tasks.put(Task(cityIdList[i - 1]))      for i in xrange(num_consumers):         tasks.put(None)      while num_jobs:         result = results.get()         print result         num_jobs -= 1 

It looks to be between 0.3 and 1.5 seconds per connection as I have measure it with 'time' module.

Is there a way to make a DB connection per process and then just use the city_id info as a variable that I can feed into a query for the cursor in this open? This way I make say four processes each with a DB connection and then drop me city_id in somehow to process.

like image 909
EnE_ Avatar asked Sep 26 '11 13:09

EnE_


People also ask

When would you use a multiprocessing pool?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).

What does multiprocessing join do?

Python multiprocessing join The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated.

What is the difference between pool and process in multiprocessing?

While the Process keeps all the processes in the memory, the Pool keeps only those that are under execution. Therefore, if you have a large number of tasks, and if they have more data and take a lot of space too, then using process class might waste a lot of memory. The overhead of creating a Pool is more.

Why multi process is slow?

The multiprocessing version is slower because it needs to reload the model in every map call because the mapped functions are assumed to be stateless. The multiprocessing version looks as follows. Note that in some cases, it is possible to achieve this using the initializer argument to multiprocessing.


1 Answers

Try to isolate the creation of your connection in the Consumer constructor, then give it to the executed Task :

import multiprocessing, time, psycopg2  class Consumer(multiprocessing.Process):      def __init__(self, task_queue, result_queue):         multiprocessing.Process.__init__(self)         self.task_queue = task_queue         self.result_queue = result_queue         self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")         self.pyConn.set_isolation_level(0)       def run(self):         proc_name = self.name         while True:             next_task = self.task_queue.get()             if next_task is None:                 print 'Tasks Complete'                 self.task_queue.task_done()                 break                         answer = next_task(connection=self.pyConn)             self.task_queue.task_done()             self.result_queue.put(answer)         return   class Task(object):     def __init__(self, a):         self.a = a      def __call__(self, connection=None):                 pyConn = connection         pyCursor1 = pyConn.cursor()          procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)          pyCursor1.execute(procQuery)         print 'What is self?'         print self.a          return self.a      def __str__(self):         return 'ARC'     def run(self):         print 'IN' 
like image 173
Cédric Julien Avatar answered Oct 06 '22 00:10

Cédric Julien