Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

JSON serialized object gives error with multiprocessing calls - TypeError: XXX objects not callable error

I am using JSON serializer helper function to easy access of dictionary(basically received as JSON) objects.

jsondict.py

"""Utilities for working with JSON and json-like structures - deeply nested Python dicts and lists

This lets us iterate over child nodes and access elements with a dot-notation.
"""
import sys
isPy3 = sys.version_info[0]==3
if isPy3:
    def __alt_str__(v,enc='utf8'):
        return v if isinstance(v,bytes) else v.encode(enc)
    __strTypes__ = (str,bytes)
else:
    __alt_str__ = unicode
    __strTypes__ = (str,unicode)

class MyLocals(object):
    pass
mylocals = MyLocals()

def setErrorCollect(collect):
    mylocals.error_collect = collect

setErrorCollect(False)

def errorValue(x):
    if isinstance(x,__strTypes__):
         return repr(x) if ' ' in x else x
    return 'None' if x is None else str(x)
def condJSON(v,__name__=''):
    return JSONDict(v,__name__=__name__) if isinstance(v,dict) else JSONList(v,__name__=__name__) if isinstance(v,list) else v

def condJSONSafe(v,__name__=''):
    return JSONDictSafe(v,__name__=__name__) if isinstance(v,dict) else JSONListSafe(v,__name__=__name__) if isinstance(v,list) else v

class JSONListIter(object):
    def __init__(self, lst, conv):
        self.lst = lst
        self.i = -1
        self.conv = conv

    def __iter__(self):
        return self

    def next(self):
        if self.i<len(self.lst)-1:
            self.i += 1         
            return self.conv(self.lst[self.i])
        else:
            raise StopIteration

    if isPy3:
        __next__ = next
        del next

class JSONList(list):
    def __init__(self,v,__name__=''):
        list.__init__(self,v)
        self.__name__ = __name__
    def __getitem__(self,x):
        return condJSON(list.__getitem__(self,x),__name__='%s\t%s'%(self.__name__,errorValue(x)))
    def __iter__(self):
        return JSONListIter(self,condJSON)

class JSONListSafe(JSONList):
    def __getitem__(self,x):
        __name__='%s\t%s'%(self.__name__,errorValue(x))
        try:
            return condJSONSafe(list.__getitem__(self,x),__name__=__name__)
        except:
            if mylocals.error_collect:
                mylocals.error_collect(__name__)
            return JSONStrSafe('')
    def __iter__(self):
        return JSONListIter(self,condJSONSafe)

class JSONStrSafe(str):
    def __getattr__(self, attr):
        return self
    __getitem__ = __getattr__


class JSONDict(dict):
    "Allows dotted access"
    def __new__(cls,*args,**kwds):
        __name__ = kwds.pop('__name__')
        self = dict.__new__(cls,*args,**kwds)
        self.__name__ = __name__
        return self

    def __init__(self,*args,**kwds):
        kwds.pop('__name__','')
        dict.__init__(self,*args,**kwds)

    def __getattr__(self, attr, default=None):
        if attr in self:
            return condJSON(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif __alt_str__(attr) in self:
            return condJSON(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif attr=='__safe__':
            return JSONDictSafe(self,__name__=self.__name__)
        else:
            raise AttributeError("No attribute or key named '%s'" % attr)

    def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
        if accept or reject:
            if not accept:
                f = lambda i: not reject(i)
            elif not reject:
                f = accept
            else: #both
                f = lambda i: accept(i) and not reject(i)
            return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
        else:
            return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems()))

    def sorted_keys(self):
        return sorted(self.keys())

class JSONDictSafe(JSONDict):
    "Allows dotted access"
    def __getattr__(self, attr, default=None):
        if attr in self:
            return condJSONSafe(self[attr],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif __alt_str__(attr) in self:
            return condJSONSafe(self[__alt_str__(attr)],__name__='%s\t%s'%(self.__name__,errorValue(attr)))
        elif attr=='__safe__':
            return self
        else:
            return JSONStrSafe('')

    def __getitem__(self,x):
        __name__='%s\t%s'%(self.__name__,errorValue(x))
        try:
            return condJSONSafe(dict.__getitem__(self,x),__name__=__name__)
        except KeyError:
            if mylocals.error_collect:
                mylocals.error_collect(__name__)
            return JSONStrSafe('')

    def sorted_items(self,accept=None, reject=lambda i: i[0]=='__name__'):
        if accept or reject:
            if not accept:
                f = lambda i: not reject(i)
            elif not reject:
                f = accept
            else: #both
                f = lambda i: accept(i) and not reject(i)
            return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
        else:
            return sorted(((k,condJSONSafe(v,__name__==k)) for k,v in self.iteritems()))

If JSON object passed like below.

data = {'name': 'john', 'age': 20, 'address': {'city':'xyz', 'country':'XZ', 'zip': 1223}}

json_obj = condJSONSafe(data)

I am able to access data with dot notation.

print(json_obj.name) --> john
print(json_obj.address.country) --> XZ

It was working well until I implementing multiprocessing in my code to improve the performance.

I have extracted a certain number of data from JSON (after made it as dot notation accessible data with the above helper function) and store it into separate lists, like list a,b,c.

And then, I passed into multiprocessing threads,

with mp.Pool(processes=mp.cpu_count()) as pool:
    res = pool.starmap(self.process_records, zip(self.a, self.b, self.c))
pool.join()

end up with

TypeError: 'JSONStrSafe' object is not callable

I tried this answer, but it does not work for me. Appreciate your help. Thanks in advance.

EDIT: reproduce example:

test.py

import jsondict
import multiprocessing as mp
import itertools

def process_records(data, metadata):
    print(data.name)
    print(metadata)
    #code to requirment


if __name__ == '__main__':
    data = {
        "metadata": "test_data",
        "cust_list": [
            {
                'name': 'john', 
                'age': 20, 
                'address': {
                    'city':'xyz', 
                    'country':'XZ', 
                    'zip': 1223
                }
            },
                {
                'name': 'michal', 
                'age': 25, 
                'address': {
                    'city':'abc', 
                    'country':'CX', 
                    'zip': 3435
                }
            },
                {
                'name': 'david', 
                'age': 30, 
                'address': {
                    'city':'mnl', 
                    'country':'TD', 
                    'zip': 6767
                }
            }
        ]
    }

    json_obj = jsondict.condJSONSafe(data)

    print(json_obj.metadata) #will print 'test_data'
    print(json_obj.cust_list[0].name) #will print 'john'
    print(json_obj.cust_list[2].address.city) #will print 'mnl'


    with mp.Pool(processes=mp.cpu_count()) as pool:
        res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
        #res = pool.map(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) --> not working
        #res = [pool.apply_async(process_records, d, json_obj.metadata) for d in json_obj.cust_list] --> not working
        #apply --> not working
    pool.join()

Output:

test_data
john
mnl
Traceback (most recent call last):
  File "c:/Users/mohanlal/Desktop/Mock/json_err/test_app.py", line 53, in <module>
    res = pool.starmap(process_records, zip(json_obj.cust_list, itertools.repeat(json_obj.metadata))) # --> not working
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 268, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 608, in get
    raise self._value
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 385, in _handle_tasks
    put(task)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: 'JSONStrSafe' object is not callable

Tried with startmap, map, apply_async, apply, getting the same error for all.

I have tried with solution given in similar question attached link above. Modified as below where this error raised.

import re
dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")

classJSONStrSafe(str):
    def__getattr__(self, attr):
        if dunder_pattern.match(attr) or protected_pattern.match(attr):
            return super().__getattr__(attr)
        return self
def__getstate__(self): returnself.__dict__
def__setstate__(self, d): self.__dict__.update(d)

__getitem__ = __getattr__

But issue persists.

As suggested in the comments, I changed in all 3 places for getattr and tried. Getting different error as below

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
    return _ForkingPickler.loads(res)
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
    __name__ = kwds.pop('__name__')
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: '__name__'
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
    return _ForkingPickler.loads(res)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 249, in _bootstrap
    self.run()
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
    __name__ = kwds.pop('__name__')
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
KeyError: '__name__'
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py", line 108, in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 345, in get
    return _ForkingPickler.loads(res)
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py", line 89, in __new__
    __name__ = kwds.pop('__name__')
KeyError: '__name__'
like image 498
user1992 Avatar asked Jan 24 '21 21:01

user1992


1 Answers

The problem is you are in a "pickle". Forgive the pun -- you have a pickle problem. When you are doing multiprocessing, the arguments to your worker functions/methods are pickled. Usually, the defaults used to serialize and de-serialize states are OK, but not in your case. See Pickling Class Instances. The default save and load operations for serializing and de-serializing an object are:

def save(obj):
    return (obj.__class__, obj.__dict__)

def load(cls, attributes):
    obj = cls.__new__(cls)
    obj.__dict__.update(attributes)
    return obj

Note that when de-serializing the object the object's __init__ method is not called but its __new__ method is, and therein lies the problem. I had to modify your __new__ method of class JSONDict to try to recognize that it was being called by de-serialization and therefore '__name__' may not be present among the keyword arguments and then had to add to that class customized __getstate__ and __setstate__ methods to override the default way it saves and restores the object's attributes (method __init__ remains unmodified):

class JSONDict(dict):
    "Allows dotted access"
    def __new__(cls,*args,**kwds):
        self = dict.__new__(cls,*args,**kwds)
        if kwds and '__name__' in kwds:
            __name__ = kwds.pop('__name__')
            self.__name__ = __name__
        return self

    def __init__(self,*args,**kwds):
        kwds.pop('__name__','')
        dict.__init__(self,*args,**kwds)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__ = d


    """ The other methods remain unmodified """

Prints:

test_data
john
mnl
john
test_data
michal
david
test_data
test_data

Update

I was scratching my head figuring out why it should be necessary to provide the __getstate__ and __setstate__ pickle methods since what they are doing should be the default action anyway. If you modify the program just to test the pickling without even running the Pool methods by inserting the following line:

json_obj = condJSONSafe(data)
# insert this line:
import pickle; print(pickle.dumps(json_obj)); sys.exit(0)

It prints:

Traceback (most recent call last):
  File "test.py", line 205, in <module>
    import pickle;  print('pickle'); print(pickle.dumps(json_obj)); sys.exit(0)
TypeError: 'JSONStrSafe' object is not callable

After adding a print statement in the right place, it became clear that the problem was in the __getattr__ method of class JSONDictSafe. When pickle checks to see if the class implements methods __getstate__ and __setstate__, when there are no implementations __getattr__ is ultimately called and returns as the default value for these attributes a JSONStrSafe instance. So instead of providing these attributes by defining these methods as I have done, one can alternatively add a simple check as follows:

class JSONDictSafe(JSONDict):
    "Allows dotted access"
    def __getattr__(self, attr, default=None):
        if attr in ('__getstate__', '__setstate__'):
            raise AttributeError(f'Missing attribute: {attr}')
        """ rest of the method is unmodified """
like image 79
Booboo Avatar answered Nov 18 '22 19:11

Booboo