The code below doesn't seem to run concurrently, and I'm not sure exactly why:
def run_normalizers(config, debug, num_threads, name=None):
def _run():
print('Started process for normalizer')
sqla_engine = init_sqla_from_config(config)
image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET)
storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET)
pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug)
if name:
pp.run_pipeline_normalizers(name)
else:
pp.run_all_normalizers()
print('Normalizer process complete')
threads = []
for i in range(num_threads):
threads.append(multiprocessing.Process(target=_run))
[t.start() for t in threads]
[t.join() for t in threads]
run_normalizers(...)
The config
variable is just a dictionary defined outside of the _run()
function. All of the processes seem to be created - but it isn't any faster than if I do it with a single process. Basically what's happening in the run_**_normalizers()
functions is reading from a queue table in a database (SQLAlchemy), then making a few HTTP requests, and then runing a 'pipeline' of normalizers to modify data and then save it back into the database. I'm coming from the JVM land where threads are 'heavy' and often used for parallelism - i'm a bit confused by this as i thought the multiprocess module was supposed to get around the limitations of Python's GIL.
fixed my multiprocessing problem - and actually switched the threads. Not sure what actually fixed it thought - i just re-architected everything and made workers and tasks and what not and things are flying now. Here's the basics of what i did:
import abc
from Queue import Empty, Queue
from threading import Thread
class AbstractTask(object):
"""
The base task
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def run_task(self):
pass
class TaskRunner(object):
def __init__(self, queue_size, num_threads=1, stop_on_exception=False):
super(TaskRunner, self).__init__()
self.queue = Queue(queue_size)
self.execute_tasks = True
self.stop_on_exception = stop_on_exception
# create a worker
def _worker():
while self.execute_tasks:
# get a task
task = None
try:
task = self.queue.get(False, 1)
except Empty:
continue
# execute the task
failed = True
try:
task.run_task()
failed = False
finally:
if failed and self.stop_on_exception:
print('Stopping due to exception')
self.execute_tasks = False
self.queue.task_done()
# start threads
for i in range(0, int(num_threads)):
t = Thread(target=_worker)
t.daemon = True
t.start()
def add_task(self, task, block=True, timeout=None):
"""
Adds a task
"""
if not self.execute_tasks:
raise Exception('TaskRunner is not accepting tasks')
self.queue.put(task, block, timeout)
def wait_for_tasks(self):
"""
Waits for tasks to complete
"""
if not self.execute_tasks:
raise Exception('TaskRunner is not accepting tasks')
self.queue.join()
all i do is create a TaskRunner and add tasks to it (thousands of them) and then call wait_for_tasks(). so, obviously in the re-architecture that I did I 'fixed' some other problem that i had. Odd though.
If you are still looking for a multiprocessing solution, you first might want to check out how to use a pool of workers, then you wouldn't have to manage the num_threads processes on your own: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
And for the slowdown problem, have you tried passing the config object as an argument to the _run function? I don't know whether/how this would make a change internally, but it's a guess that it might change something.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With