I tried to rewrite some csv-reading code to be able to run it on multiple cores in Python 3.2.2. I tried to use the Pool
object of multiprocessing, which I adapted from working examples (and already worked for me for another part of my project). I ran into an error message I found hard to decipher and troubleshoot.
The error:
Traceback (most recent call last):
File "parser5_nodots_parallel.py", line 256, in <module>
MG,ppl = csv2graph(r)
File "parser5_nodots_parallel.py", line 245, in csv2graph
node_chunks)
File "/Library/Frameworks/Python.framework/Versions/3.2/lib/python3.2/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.2/lib/python3.2/multiprocessing/pool.py", line 552, in get
raise self._value
AttributeError: __exit__
The relevant code:
import csv
import time
import datetime
import re
from operator import itemgetter
from multiprocessing import Pool
import itertools
def chunks(l,n):
"""Divide a list of nodes `l` in `n` chunks"""
l_c = iter(l)
while 1:
x = tuple(itertools.islice(l_c,n))
if not x:
return
yield x
def csv2nodes(r):
strptime = time.strptime
mktime = time.mktime
l = []
ppl = set()
pattern = re.compile(r"""[A-Za-z0-9"/]+?(?=[,\n])""")
for row in r:
with pattern.findall(row) as f:
cell = int(f[3])
id = int(f[2])
st = mktime(strptime(f[0],'%d/%m/%Y'))
ed = mktime(strptime(f[1],'%d/%m/%Y'))
# collect list
l.append([(id,cell,{1:st,2: ed})])
# collect separate sets
ppl.add(id)
return (l,ppl)
def csv2graph(source):
MG=nx.MultiGraph()
# Remember that I use integers for edge attributes, to save space! Dic above.
# start: 1
# end: 2
p = Pool()
node_divisor = len(p._pool)
node_chunks = list(chunks(source,int(len(source)/int(node_divisor))))
num_chunks = len(node_chunks)
pedgelists = p.map(csv2nodes,
node_chunks)
ll = []
ppl = set()
for l in pedgelists:
ll.append(l[0])
ppl.update(l[1])
MG.add_edges_from(ll)
return (MG,ppl)
with open('/Users/laszlosandor/Dropbox/peers_prisons/python/codetenus_test.txt','r') as source:
r = source.readlines()
MG,ppl = csv2graph(r)
What's a good way to troubleshoot this?
Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
The process pool can be shutdown by calling the Pool. close() function. This will prevent the pool from accepting new tasks. Once all issued tasks are completed, the resources of the process pool, such as the child worker processes, will be released.
Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism). Below is a simple Python multiprocessing Pool example.
The multiprocesing module avoids the limitations of the Global Interpreter Lock (GIL) by using subprocesses instead of threads. The multiprocessed code does not execute in the same order as serial code. There is no guarantee that the first process to be created will be the first to complete.
The problem is in this line:
with pattern.findall(row) as f:
You are using the with
statement. It requires an object with __enter__
and __exit__
methods. But pattern.findall
returns a list
, with
tries to store the __exit__
method, but it can't find it, and raises an error. Just use
f = pattern.findall(row)
instead.
It is not the asker's problem in this instance but the first troubleshooting step for a generic "AttributeError: __exit__" should be making sure the brackets are there, e.g.
with SomeContextManager() as foo:
#works because a new object is referenced...
not
with SomeContextManager as foo:
#AttributeError because the class is referenced
Catches me out from time to time and I end up here -__-
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With