Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TypeError: can't pickle _thread._local objects when using dask on pandas DataFrame

I have a huge DataFrame which I want to process using dask in order to save time. The problem is that I get stuck in this TypeError: can't pickle _thread._local objects error as soon as it starts running. Can someone help me?

I have written a function that processes the data stored in the DF based on its rows and I process it with

out = df_query.progress_apply(lambda row: run(row), axis=1)

and it runs fine.

Since this takes a lot of time, I started using dask:

ddata = dd.from_pandas(df_query, npartitions=3)
out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')

The problem is that as soon as the processing starts I get this error (after a huge Traceback, see below): TypeError: can't pickle _thread._local objects

The run(...) function does some data manipulation, including queries to a DB.

Here is the complete Traceback:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-aefae1f00437> in <module>
----> 1 out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    396     keys = [x.__dask_keys__() for x in collections]
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398     results = schedule(dsk, keys, **kwargs)
    399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
    190                            get_id=_process_get_id, dumps=dumps, loads=loads,
    191                            pack_exception=pack_exception,
--> 192                            raise_exception=reraise, **kwargs)
    193     finally:
    194         if cleanup:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    447             # Seed initial tasks into the thread pool
    448             while state['ready'] and len(state['running']) < num_workers:
--> 449                 fire_task()
    450 
    451             # Main loop, wait on tasks to finish, insert new ones

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in fire_task()
    441                 # Submit
    442                 apply_async(execute_task,
--> 443                             args=(key, dumps((dsk[key], data)),
    444                                   dumps, loads, get_id, pack_exception),
    445                             callback=queue.put)

~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in _dumps(x)
     24 
     25 def _dumps(x):
---> 26     return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
     27 
     28 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
    950     try:
    951         cp = CloudPickler(file, protocol=protocol)
--> 952         cp.dump(obj)
    953         return file.getvalue()
    954     finally:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
    265         self.inject_addons()
    266         try:
--> 267             return Pickler.dump(self, obj)
    268         except RuntimeError as e:
    269             if 'recursion' in e.args[0]:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in dump(self, obj)
    435         if self.proto >= 4:
    436             self.framer.start_framing()
--> 437         self.save(obj)
    438         self.write(STOP)
    439         self.framer.end_framing()

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    636         else:
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)
    640 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    784         write(MARK)
    785         for element in obj:
--> 786             save(element)
    787 
    788         if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    885                 k, v = tmp[0]
    886                 save(k)
--> 887                 save(v)
    888                 write(SETITEM)
    889             # else tmp is empty, and we're done

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    784         write(MARK)
    785         for element in obj:
--> 786             save(element)
    787 
    788         if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
    769         if n <= 3 and self.proto >= 2:
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.
    773             if id(obj) in memo:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
    814 
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 
    818     dispatch[list] = save_list

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
    838                 write(MARK)
    839                 for x in tmp:
--> 840                     save(x)
    841                 write(APPENDS)
    842             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
    814 
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 
    818     dispatch[list] = save_list

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
    838                 write(MARK)
    839                 for x in tmp:
--> 840                     save(x)
    841                 write(APPENDS)
    842             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
    394                 or themodule is None):
--> 395             self.save_function_tuple(obj)
    396             return
    397         else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    885                 k, v = tmp[0]
    886                 save(k)
--> 887                 save(v)
    888                 write(SETITEM)
    889             # else tmp is empty, and we're done

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
    394                 or themodule is None):
--> 395             self.save_function_tuple(obj)
    396             return
    397         else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
    393                 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
    394                 or themodule is None):
--> 395             self.save_function_tuple(obj)
    396             return
    397         else:

~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
    592         if hasattr(func, '__qualname__'):
    593             state['qualname'] = func.__qualname__
--> 594         save(state)
    595         write(pickle.TUPLE)
    596         write(pickle.REDUCE)  # applies _fill_function on the tuple

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    547 
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 
    551     def persistent_id(self, obj):

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    660 
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)
    664 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
    854 
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 
    858     dispatch[dict] = save_dict

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
    880                 for k, v in tmp:
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)
    884             elif n:

~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    522             reduce = getattr(obj, "__reduce_ex__", None)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:
    526                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle _thread._local objects
like image 427
FrancescoLS Avatar asked Apr 16 '19 12:04

FrancescoLS


1 Answers

Your run function probably references variables fro outside its scope, which are being captured into the closure. Be sure that any file handles or database connections get created inside the function

Bad:

conn = DBConn(...)
def run(row):
    return conn.do_stuff(row)

Good:

def run(row):
    conn = DBConn(...)
    return conn.do_stuff(row)
like image 68
mdurant Avatar answered Nov 07 '22 11:11

mdurant