I am using JSON serializer helper function to easy access of dictionary(basically received as JSON) objects.
jsondict.py
"""Utilities for working with JSON and json-like structures - deeply nested Python dicts and lists
This lets us iterate over child nodes and access elements with a dot-notation.
"""
import sys
isPy3 = sys.version_info[0]==3
if isPy3:
def __alt_str__(v,enc='utf8'):
return v if isinstance(v,bytes) else v.encode(enc)
__strTypes__ = (str,bytes)
else:
__alt_str__ = unicode
__strTypes__ = (str,unicode)
class MyLocals(object):
pass
mylocals = MyLocals()
def setErrorCollect(collect):
mylocals.error_collect = collect
setErrorCollect(False)
def errorValue(x):
if isinstance(x,__strTypes__):
return repr(x) if ' ' in x else x
return 'None' if x is None else str(x)
def condJSON(v,__name__=''):
return JSONDict(v,__name__=__name__) if isinstance(v,dict) else JSONList(v,__name__=__name__) if isinstance(v,list) else v
def condJSONSafe(v,__name__=''):
return JSONDictSafe(v,__name__=__name__) if isinstance(v,dict) else JSONListSafe(v,__name__=__name__) if isinstance(v,list) else v
class JSONListIter(object):
def __init__(self, lst, conv):
self.lst = lst
self.i = -1
self.conv = conv
def __iter__(self):
return self
def next(self):
if self.i<len(self.lst)-1:
self.i += 1
return self.conv(self.lst[self.i])
else:
raise StopIteration
if isPy3:
__next__ = next
del next
class JSONList(list):
def __init__(self,v,__name__=''):
list.__init__(self,v)
self.__name__ = __name__
def __getitem__(self,x):
return condJSON(list.__getitem__(self,x),__name__='%s\t%s'%(self.__name__,errorValue(x)))
def __iter__(self):
return JSONListIter(self,condJSON)
class JSONListSafe(JSONList):
def __getitem__(self,x):
__name__='%s\t%s'%(self.__name__,errorValue(x))
try:
return condJSONSafe(list.__getitem__(self,x),__name__=__name__)
except:
if mylocals.error_collect:
mylocals.error_collect(__name__)
return JSONStrSafe('')
def __iter__(self):
return JSONListIter(self,condJSONSafe)
class JSONStrSafe(str):
def __getattr__(self, attr):
return self
__getitem__ = __getattr__
class JSONDict(dict):
"Allows dotted access"
def __new__(cls,*args,**kwds):
__name__ = kwds.pop('__name__')
self = dict.__new__(cls,*args,**kwds)
self.__name__ = __name__
return self
def __init__(self,*args,**kwds):
kwds.pop('__name__','')
dict.__init__(self,*args,**kwds)
def __getattr__(self, attr, default=None):
if attr in self:
return condJSON(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif __alt_str__(attr) in self:
return condJSON(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif attr=='__safe__':
return JSONDictSafe(self,__name__=self.__name__)
else:
raise AttributeError("No attribute or key named '%s'" % attr)
def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
if accept or reject:
if not accept:
f = lambda i: not reject(i)
elif not reject:
f = accept
else: #both
f = lambda i: accept(i) and not reject(i)
return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
else:
return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems()))
def sorted_keys(self):
return sorted(self.keys())
class JSONDictSafe(JSONDict):
"Allows dotted access"
def __getattr__(self, attr, default=None):
if attr in self:
return condJSONSafe(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif __alt_str__(attr) in self:
return condJSONSafe(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
elif attr=='__safe__':
return self
else:
return JSONStrSafe('')
def __getitem__(self,x):
__name__='%s\t%s'%(self.__name__,errorValue(x))
try:
return condJSONSafe(dict.__getitem__(self,x),__name__=__name__)
except KeyError:
if mylocals.error_collect:
mylocals.error_collect(__name__)
return JSONStrSafe('')
def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
if accept or reject:
if not accept:
f = lambda i: not reject(i)
elif not reject:
f = accept
else: #both
f = lambda i: accept(i) and not reject(i)
return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
else:
return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems()))
If JSON object passed like below.
data = {'name': 'john', 'age': 20, 'address': {'city':'xyz', 'country':'XZ', 'zip': 1223}}
json_obj = condJSONSafe(data)
I am able to access data with dot notation.
print(json_obj.name) --> john
print(json_obj.address.country) --> XZ
It was working well until I implementing multiprocessing in my code to improve the performance.
I have extracted a certain number of data from JSON (after made it as dot notation accessible data with the above helper function) and store it into separate lists, like list a,b,c.
And then, I passed into multiprocessing threads,
with mp.Pool(processes=mp.cpu_count()) as pool:
res = pool.starmap(self.process_records, zip(self.a, self.b, self.c))
pool.join()
end up with
TypeError: 'JSONStrSafe' object is not callable
I tried this answer, but it does not work for me. Appreciate your help. Thanks in advance.
EDIT: reproduce example:
test.py
import jsondict
import multiprocessing as mp
import itertools
def process_records(data, metadata):
print(data.name)
print(metadata)
#code to requirment
if __name__ == '__main__':
data = {
"metadata": "test_data",
"cust_list": [
{
'name': 'john',
'age': 20,
'address': {
'city':'xyz',
'country':'XZ',
'zip': 1223
}
},
{
'name': 'michal',
'age': 25,
'address': {
'city':'abc',
'country':'CX',
'zip': 3435
}
},
{
'name': 'david',
'age': 30,
'address': {
'city':'mnl',
'country':'TD',
'zip': 6767
}
}
]
}
json_obj = jsondict.condJSONSafe(data)
print(json_obj.metadata) #will print 'test_data'
print(json_obj.cust_list[0].name) #will print 'john'
print(json_obj.cust_list[2].address.city) #will print 'mnl'
with mp.Pool(processes=mp.cpu_count()) as pool:
res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
#res = pool.map(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) --> not working
#res = [pool.apply_async(process_records, d, json_obj.metadata) for d in json_obj.cust_list] --> not working
#apply --> not working
pool.join()
Output:
test_data
john
mnl
Traceback (most recent call last):
File "c:/Users/mohanlal/Desktop/Mock/json_err/test_app.py", line 53, in <module>
res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 268, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 608, in get
raise self._value
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 385, in _handle_tasks
put(task)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: 'JSONStrSafe' object is not callable
Tried with startmap, map, apply_async, apply, getting the same error for all.
I have tried with solution given in similar question attached link above. Modified as below where this error raised.
import re
dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")
classJSONStrSafe(str):
def__getattr__(self, attr):
if dunder_pattern.match(attr) or protected_pattern.match(attr):
return super().__getattr__(attr)
return self
def__getstate__(self): returnself.__dict__
def__setstate__(self, d): self.__dict__.update(d)
__getitem__ = __getattr__
But issue persists.
As suggested in the comments, I changed in all 3 places for getattr and tried. Getting different error as below
Process SpawnPoolWorker-1:
Traceback (most recent call last):
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: '__name__'
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
KeyError: '__name__'
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
return _ForkingPickler.loads(res)
File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
__name__ = kwds.pop('__name__')
KeyError: '__name__'
The problem is you are in a "pickle". Forgive the pun -- you have a pickle problem. When you are doing multiprocessing, the arguments to your worker functions/methods are pickled. Usually, the defaults used to serialize and de-serialize states are OK, but not in your case. See Pickling Class Instances. The default save and load operations for serializing and de-serializing an object are:
def save(obj):
return (obj.__class__, obj.__dict__)
def load(cls, attributes):
obj = cls.__new__(cls)
obj.__dict__.update(attributes)
return obj
Note that when de-serializing the object the object's __init__
method is not called but its __new__
method is, and therein lies the problem. I had to modify your __new__
method of class JSONDict
to try to recognize that it was being called by de-serialization and therefore '__name__'
may not be present among the keyword arguments and then had to add to that class customized __getstate__
and __setstate__
methods to override the default way it saves and restores the object's attributes (method __init__
remains unmodified):
class JSONDict(dict):
"Allows dotted access"
def __new__(cls,*args,**kwds):
self = dict.__new__(cls,*args,**kwds)
if kwds and '__name__' in kwds:
__name__ = kwds.pop('__name__')
self.__name__ = __name__
return self
def __init__(self,*args,**kwds):
kwds.pop('__name__','')
dict.__init__(self,*args,**kwds)
def __getstate__(self):
return self.__dict__
def __setstate__(self, d):
self.__dict__ = d
""" The other methods remain unmodified """
Prints:
test_data
john
mnl
john
test_data
michal
david
test_data
test_data
Update
I was scratching my head figuring out why it should be necessary to provide the __getstate__
and __setstate__
pickle methods since what they are doing should be the default action anyway. If you modify the program just to test the pickling without even running the Pool
methods by inserting the following line:
json_obj = condJSONSafe(data)
# insert this line:
import pickle; print(pickle.dumps(json_obj)); sys.exit(0)
It prints:
Traceback (most recent call last):
File "test.py", line 205, in <module>
import pickle; print('pickle'); print(pickle.dumps(json_obj)); sys.exit(0)
TypeError: 'JSONStrSafe' object is not callable
After adding a print statement in the right place, it became clear that the problem was in the __getattr__
method of class JSONDictSafe
. When pickle
checks to see if the class implements methods __getstate__
and __setstate__
, when there are no implementations __getattr__
is ultimately called and returns as the default value for these attributes a JSONStrSafe
instance. So instead of providing these attributes by defining these methods as I have done, one can alternatively add a simple check as follows:
class JSONDictSafe(JSONDict):
"Allows dotted access"
def __getattr__(self, attr, default=None):
if attr in ('__getstate__', '__setstate__'):
raise AttributeError(f'Missing attribute: {attr}')
""" rest of the method is unmodified """
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With