I'm building a python module to extract tags from a large corpus of text, and while its results are high quality it executes very slowly. I'm trying to speed the process up by using multiprocessing, and that was working too, until I tried to introduce a lock so that only one process was connecting to our database at a time. I can't figure out for the life of me how to make this work - despite much searching and tweaking I am still getting a PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
. Here's the offending code - it worked fine until I tried to pass a lock object as an argument for f
.
def make_network(initial_tag, max_tags = 2, max_iter = 3):
manager = Manager()
lock = manager.Lock()
pool = manager.Pool(8)
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a list
# of strings (tags) as its sole argument, and returns a list of sets with entries
# corresponding to the input list.
f = partial(get_more_tags, max_tags = max_tags, lock = lock)
def _recursively_find_more_tags(tags, level):
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print i + "|" + joined
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level+1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
Your problem is that lock objects are not picklable. I can see two possible solutions for you in that case.
To avoid this, you can make your lock variable a global variable. Then you will be able to reference it within your pool process function directly as a global variable, and will not have to pass it as an argument to the pool process function. This works because Python uses the OS fork
mechanism when creating the pool processes and hence copies the entire contents of the process that creates the pool processes to them. This is the only way of passing a lock to a Python process created with the multiprocessing package. Incidentally, it is not necessary to use the Manager
class just for this lock. With this change your code would look like this:
import multiprocessing
from functools import partial
lock = None # Global definition of lock
pool = None # Global definition of pool
def make_network(initial_tag, max_tags=2, max_iter=3):
global lock
global pool
lock = multiprocessing.Lock()
pool = multiprocessing.Pool(8)
def get_more_tags():
global lock
pass
# this is a very expensive function that I would like to parallelize
# over a list of tags. It involves a (relatively cheap) call to an external
# database, which needs a lock to avoid simultaneous queries. It takes a
# list of strings (tags) as its sole argument, and returns a list of sets
# with entries corresponding to the input list.
f = partial(get_more_tags, max_tags=max_tags)
def _recursively_find_more_tags(tags, level):
global pool
if level >= max_iter:
raise StopIteration
new_tags = pool.map(f, tags)
to_search = []
for i, s in zip(tags, new_tags):
for t in s:
joined = ' '.join(t)
print(i + "|" + joined)
to_search.append(joined)
try:
return _recursively_find_more_tags(to_search, level + 1)
except StopIteration:
return None
_recursively_find_more_tags([initial_tag], 0)
In your real code, it is possible that the lock and pool variables might be class instance variables.
multiprocessing.Process
and connect it via a multiprocessing.Queue
to each of your pool processes. This process would be responsible for running your database query. You would use the queue to allow your pool processes to send parameters to the process that managed the database query. Since all the pool processes would use the same queue, access to the database would automatically be serialized. The additional overheads would come from the pickling/unpickling of the database query arguments and the query response. Note that you can pass a multiprocessing.Queue
object to a pool process as an argument. Note also that the multiprocessing.Lock
based solution would not work on Windows
where process are not created with fork
semantics.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With