I would like to use multiprocessing pool with an iterator in order to execute a function in a thread splitting the iterator in N elements until the iterator is finish.
import arcpy
from multiprocessing import Pool
def insert(rows):
with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
#i_cursor is an iterator
for row in rows:
i_cursor.insertRow(row)
input_rows = []
count = 0
pool = Pool(4)
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
#s_cursor is an iterator
for row in s_cursor:
if (count < 100):
input_rows.append(row)
count += 1
else:
#send 100 rows to the insert function in a new thread
pool.apply_async(insert, input_rows)
#reset count and input_rows
count = 1
input_rows = [row]
pool.join()
pool.close()
My question, is this script the right way to do it? Is there a better way?
Probably something is wrong with that script, because I got the following AssertionError at the pool.join()
Traceback (most recent call last):
File "G:\Maxime\truncate_append_pool.py", line 50, in <module>
pool.join()
File "C:\App\Python27\ArcGIS10.3\lib\multiprocessing\pool.py", line 460, in join
assert self._state in (CLOSE, TERMINATE)
AssertionError
Use Pool.The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
Use the multiprocessing pool if your tasks are independent. This means that each task is not dependent on other tasks that could execute at the same time. It also may mean tasks that are not dependent on any data other than data provided via function arguments to the task.
The __iter__() method returns the iterator object itself. If required, some initialization can be performed. The __next__() method must return the next item in the sequence. On reaching the end, and in subsequent calls, it must raise StopIteration .
Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.
If I have to guess what's primarily wrong with your code, I'd say it's in passing your input_rows
to your process function insert()
- the way multiprocessing.Pool.apply_async()
works is to unpack the arguments passed to it, so your insert()
function actually retreives 100
arguments instead of one argument with a list of 100
elements. This causes an immediate error before your process function even gets the chance to start. If you change your call to pool.apply_async(insert, [input_rows])
it might start working... You also would be defeating the purpose of iterators and you just might convert your whole input iterator into a list and feed slices of 100
to multiprocessing.Pool.map()
and be done with it.
But you asked if there is a 'better' way to do it. While 'better' is a relative category, in an ideal world, multiprocessing.Pool
comes with a handy imap()
(and imap_unordered()
) method intended to consume iterables and spread them over the selected pool in a lazy fashion (so no running over the whole iterator before processing), so all you need to build are your iterator slices and pass it to it for processing, i.e.:
import arcpy
import itertools
import multiprocessing
# a utility function to get us a slice of an iterator, as an iterator
# when working with iterators maximum lazyness is preferred
def iterator_slice(iterator, length):
iterator = iter(iterator)
while True:
res = tuple(itertools.islice(iterator, length))
if not res:
break
yield res
def insert(rows):
with arcpy.da.InsertCursor("c:\temp2.gdb\test" fields=["*"]) as i_cursor:
for row in rows:
i_cursor.insertRow(row)
if __name__ == "__main__": # guard for multi-platform use
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4) # lets use 4 workers
for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
pass # do whatever you want with your result (return from your process function)
pool.close() # all done, close cleanly
(btw. your code wouldn't give you the last slice for all s_cursor
sizes that are not multiples of 100)
But... it would be wonderful if it actually worked as advertised. While a lot of it has been fixed over the years, imap_unordered()
will still take a large sample of your iterator (far larger than the actual pool processes' number) when producing its own iterator, so if that's a concern you'll have to get down and dirty yourself, and you're on the right track - apply_async()
is the way to go when you want to control how to feed your pool, you just need to make sure you feed your pool properly:
if __name__ == "__main__":
with arcpy.da.SearchCursor("c:\temp.gdb\test", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4) # lets use 4 workers
cursor_iterator = iterator_slice(s_cursor, 100) # slicer from above, for convinience
queue = [] # a queue for our current worker async results, a deque would be faster
while cursor_iterator or queue: # while we have anything to do...
try:
# add our next slice to the pool:
queue.append(pool.apply_async(insert, [next(cursor_iterator)]))
except (StopIteration, TypeError): # no more data, clear out the slice iterator
cursor_iterator = None
# wait for a free worker or until all remaining finish
while queue and (len(queue) >= pool._processes or not cursor_iterator):
process = queue.pop(0) # grab a process response from the top
process.wait(0.1) # let it breathe a little, 100ms should be enough
if not process.ready(): # a sub-process has not finished execution
queue.append(process) # add it back to the queue
else:
# you can use process.get() to get the result if needed
pass
pool.close()
And now your s_cursor
iterator will be called only when the next 100 results are needed (when your insert()
process function exits cleanly or not).
UPDATE - The previously posted code had a bug in it on closing queues in the end if a captured result is desired, this one should do the job nicely. We can easily test it with some mock functions:
import random
import time
# just an example generator to prove lazy access by printing when it generates
def get_counter(limit=100):
for i in range(limit):
if not i % 3: # print every third generation to reduce verbosity
print("Generated: {}".format(i))
yield i
# our process function, just prints what's passed to it and waits for 1-6 seconds
def test_process(values):
time_to_wait = 1 + random.random() * 5
print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
time.sleep(time_to_wait)
print("Processed: {}".format(values))
Now we can intertwine them like:
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2) # lets use just 2 workers
count = get_counter(30) # get our counter iterator set to iterate from 0-29
count_iterator = iterator_slice(count, 7) # we'll process them in chunks of 7
queue = [] # a queue for our current worker async results, a deque would be faster
while count_iterator or queue:
try:
# add our next slice to the pool:
queue.append(pool.apply_async(test_process, [next(count_iterator)]))
except (StopIteration, TypeError): # no more data, clear out the slice iterator
count_iterator = None
# wait for a free worker or until all remaining workers finish
while queue and (len(queue) >= pool._processes or not count_iterator):
process = queue.pop(0) # grab a process response from the top
process.wait(0.1) # let it breathe a little, 100ms should be enough
if not process.ready(): # a sub-process has not finished execution
queue.append(process) # add it back to the queue
else:
# you can use process.get() to get the result if needed
pass
pool.close()
And the result is (of course, it will differ from system to system):
Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)
Proving that our generator/iterator is used to collect data only when there's a free slot in the pool to do the work ensuring a minimal memory usage (and/or I/O pounding if your iterators ultimately do that). You won't get it much more streamlined than this. The only additional, albeit marginal, speed up you can obtain is to reduce the wait time (but your main process will then eat more resources) and to increase the allowed queue
size (at the expense of memory) which is locked to the number of processes in the above code - if you use while queue and (len(queue) >= pool._processes + 3 or not count_iterator):
it will load 3 more iterator slices ensuring lesser latency in situations when a process ends and a slot in the pool frees up.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With