Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correctly use ThreadPool with Generators

I am having trouble using ThreadPools with a Generator when processing a CSV file in Python 2.7. Here is some sample code that illustrates my point:

from multiprocessing.dummy import Pool as ThreadPool
import time

def getNextBatch():
    # Reads lines from a huge CSV and yields them as required.
    for i in range(5):
        yield i;

def processBatch(batch):
    # This simulates a slow network request that happens.
    time.sleep(1);
    print "Processed Batch " + str(batch);

# We use 4 threads to attempt to aleviate the bottleneck caused by network I/O.
threadPool = ThreadPool(processes = 4)

batchGenerator = getNextBatch()

for batch in batchGenerator:
    threadPool.map(processBatch, (batch,))

threadPool.close()
threadPool.join()

When I run this, I get the expected output:

Processed Batch 0

Processed Batch 1

Processed Batch 2

Processed Batch 3

Processed Batch 4

The problem is that they appear with a 1 second delay between each print. Effectively, my script is running sequentially (and not using multiple threads as I would like it to).

The goal here is to get those printed statements to all appear after ~1 second, rather than one per second for 5 seconds.

like image 344
Loic Verrall Avatar asked Nov 17 '17 17:11

Loic Verrall


1 Answers

here's your problem

for batch in batchGenerator:
    threadPool.map(processBatch, (batch,))

when I tried

threadPool.map(processBatch, batchGenerator)

it worked as expected (but not in order). The for loop is using a threadPool to process each batch one-at-a-time. so it finishes one, then moved on, then ...

like image 188
e.s. Avatar answered Sep 30 '22 10:09

e.s.