Similar to another post I made, this answers that post and creates a new question.
Recap: I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate it to the polygon feature that it lies within. So if my point 'New York City' lies within polygon USA and for the USA polygon 'GID = 1' I will assign 'gid_fkey = 1' for my point New York City.
Okay so this has been achieved using multiprocessing. I have noticed a 150% increase in speed using this so it does work. But I think there is a bunch of unecessary overhead as one DB connection is required for each record.
So here is the code:
import multiprocessing, time, psycopg2 class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: print 'Tasks Complete' self.task_queue.task_done() break answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a): self.a = a def __call__(self): pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") pyConn.set_isolation_level(0) pyCursor1 = pyConn.cursor() procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) pyCursor1.execute(procQuery) print 'What is self?' print self.a return self.a def __str__(self): return 'ARC' def run(self): print 'IN' if __name__ == '__main__': tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() num_consumers = multiprocessing.cpu_count() * 2 consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] for w in consumers: w.start() pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") pyConnX.set_isolation_level(0) pyCursorX = pyConnX.cursor() pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL') temp = pyCursorX.fetchall() num_job = temp[0] num_jobs = num_job[0] pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL') cityIdListTuple = pyCursorX.fetchall() cityIdListList = [] for x in cityIdListTuple: cityIdList.append(x[0]) for i in xrange(num_jobs): tasks.put(Task(cityIdList[i - 1])) for i in xrange(num_consumers): tasks.put(None) while num_jobs: result = results.get() print result num_jobs -= 1
It looks to be between 0.3 and 1.5 seconds per connection as I have measure it with 'time' module.
Is there a way to make a DB connection per process and then just use the city_id info as a variable that I can feed into a query for the cursor in this open? This way I make say four processes each with a DB connection and then drop me city_id in somehow to process.
Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).
Python multiprocessing join The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated.
While the Process keeps all the processes in the memory, the Pool keeps only those that are under execution. Therefore, if you have a large number of tasks, and if they have more data and take a lot of space too, then using process class might waste a lot of memory. The overhead of creating a Pool is more.
The multiprocessing version is slower because it needs to reload the model in every map call because the mapped functions are assumed to be stateless. The multiprocessing version looks as follows. Note that in some cases, it is possible to achieve this using the initializer argument to multiprocessing.
Try to isolate the creation of your connection in the Consumer constructor, then give it to the executed Task :
import multiprocessing, time, psycopg2 class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") self.pyConn.set_isolation_level(0) def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: print 'Tasks Complete' self.task_queue.task_done() break answer = next_task(connection=self.pyConn) self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a): self.a = a def __call__(self, connection=None): pyConn = connection pyCursor1 = pyConn.cursor() procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) pyCursor1.execute(procQuery) print 'What is self?' print self.a return self.a def __str__(self): return 'ARC' def run(self): print 'IN'
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With