I recently started experimenting with multiprocessing to speed up a task. I created a script that does fuzzy string matching and calculates scores using different algorithms (I wanted to compare different matching techniques). You can find the full source here: https://bitbucket.org/bergonzzi/fuzzy-compare/src. As input it takes 2 files which are combined into pairs (each line of file1 with each line of file2). For each pair, fuzzy match scores are calculated.
I made 3 versions. Running with the sample data provided in my repo (which consists of 697.340 items after being combined into pairs), I have the following timings:
I'm trying to understand why my Pool.map() version is much faster than my Queue version, which is actually slower than the simple single-process one.
My reasoning for even attempting to using Queues is that the Pool.map() version holds on to the results until everything's finished and only writes to a file at the end. This means that for big files it ends up eating a lot of memory. I'm talking about this version (linking to it because it's a lot of code to paste here).
To solve this I refactored it into a producer/consumer pattern (or attempted at least). Here I first produce jobs by combining both input files and put them in a queue which the consumers process (calculate fuzzy match scores). Done jobs are put into an out queue. Then I have a single process grabbing done items from this queue and writing them to a file. This way, in theory, I wouldn't need as much memory since results would be flushed out to disk. It seems to work fine but it's much slower. I also noticed that the 4 processes I'm spawning don't seem to use up 100% CPU when looking at the Activity Monitor on Mac OSX (which is not the case with the Pool.map() version).
Another thing I notice is that my producer function seems to fill up the queue properly but the consumer processes seem to wait until the queue is filled up instead of starting to work as soon as the first item arrives. I'm probably doing something wrong there...
For reference here's some of the relevant code for the Queue version (although it's better to look at the full code in the repo linked above).
Here's my producer function:
def combine(list1, list2):
'''
Combine every item of list1 with every item of list 2,
normalize put the pair in the job queue.
'''
pname = multiprocessing.current_process().name
for x in list1:
for y in list2:
# slugify is a function to normalize the strings
term1 = slugify(x.strip(), separator=' ')
term2 = slugify(y.strip(), separator=' ')
job_queue.put_nowait([term1, term2])
This is the writer function:
def writer(writer_queue):
out = open(file_out, 'wb')
pname = multiprocessing.current_process().name
out.write(header)
for match in iter(writer_queue.get, "STOP"):
print("%s is writing %s") % (pname, str(match))
line = str(';'.join(match) + '\n')
out.write(line)
out.close()
This is the worker function that does the actual calculations (stripped out most of the code since it doesn't make a difference here, full source on the repo):
def score_it(job_queue, writer_queue):
'''Calculate scores for pair of words.'''
pname = multiprocessing.current_process().name
for pair in iter(job_queue.get_nowait, "STOP"):
# do all the calculations and put the result into the writer queue
writer_queue.put(result)
This is how I set up the processes:
# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()
workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []
print('Start matching with "%s", minimum score of %s and %s workers') % (
args.algorithm, minscore, workers)
# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()
print("Job queue size: %s") % job_queue.qsize()
# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()
for w in xrange(workers):
p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
p.start()
processes.append(p)
job_queue.put("STOP")
for p in processes:
p.join()
writer_queue.put("STOP")
I've read quite a bit here about multiprocessing being slower sometimes and I know this has to do with the overhead of creating and managing new processes. Also when the job to be done isn't "big" enough, the effect of multiprocessing might not be visible. However in this case I think the job's quite big and also the Pool.map() version seems to prove it because it's much faster.
Am I doing something really wrong when managing all these processes and passing over queue objects? How can this be optimised so that results can be written to a file as they are processed in order to minimise the amount of memory required while running it?
Thanks!
This is due to the Python GIL being the bottleneck preventing threads from running completely concurrently. The best possible CPU utilisation can be achieved by making use of the ProcessPoolExecutor or Process modules which circumvents the GIL and make code run more concurrently.
So, multiprocessing is faster when the program is CPU-bound. In cases where there is a lot of I/O in your program, threading may be more efficient because most of the time, your program is waiting for the I/O to complete. However, multiprocessing is generally more efficient because it runs concurrently.
Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.
Use the multiprocessing pool if your tasks are independent. This means that each task is not dependent on other tasks that could execute at the same time. It also may mean tasks that are not dependent on any data other than data provided via function arguments to the task.
I think the issue with your timings is your multithreaded-queue version is missing an optimization. You made a comment essentially saying that your job_queue fills up before the worker threads start taking jobs from it. I believe the reason for this is the c.join() you have in #Fill up job queue. This prevents the main thread from continuing until the job queue is full. I'd move the c.join() to the end after the p.join()'s. You'll also need to figure out a way to get your stop flags into the end of the queue. The combine function might be a good place to put this. Something along the lines of adding x number of stop flags after it's run out of data to combine.
One other thing to note: You're writing over you w variable within the scope of your for loop that kicks off the p processes. As a matter of style/readability/etc, I'd change w to a different variable name. If you're not using it, an underscore works as a good throwaway variable name. I.e.
for w in xrange(workers):
should become
for _ in xrange(workers):
Long story short, if you move the c.join() to the end, you should get more accurate timings. Currently, the only thing that's multithreaded is the fuzzy matching of strings. One of the advantages of having a producer/consumer thread is the consumer threads don't have to wait until the producer thread is finished, and thus, you end up using less memory.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With