I have a huge DataFrame which I want to process using dask in order to save time. The problem is that I get stuck in this TypeError: can't pickle _thread._local objects
error as soon as it starts running. Can someone help me?
I have written a function that processes the data stored in the DF based on its rows and I process it with
out = df_query.progress_apply(lambda row: run(row), axis=1)
and it runs fine.
Since this takes a lot of time, I started using dask:
ddata = dd.from_pandas(df_query, npartitions=3)
out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')
The problem is that as soon as the processing starts I get this error (after a huge Traceback, see below): TypeError: can't pickle _thread._local objects
The run(...)
function does some data manipulation, including queries to a DB.
Here is the complete Traceback:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-14-aefae1f00437> in <module>
----> 1 out = ddata.map_partitions(lambda df: df.apply((lambda row: run(row)), axis=1)).compute(scheduler='processes')
~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, pool, **kwargs)
190 get_id=_process_get_id, dumps=dumps, loads=loads,
191 pack_exception=pack_exception,
--> 192 raise_exception=reraise, **kwargs)
193 finally:
194 if cleanup:
~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
447 # Seed initial tasks into the thread pool
448 while state['ready'] and len(state['running']) < num_workers:
--> 449 fire_task()
450
451 # Main loop, wait on tasks to finish, insert new ones
~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/local.py in fire_task()
441 # Submit
442 apply_async(execute_task,
--> 443 args=(key, dumps((dsk[key], data)),
444 dumps, loads, get_id, pack_exception),
445 callback=queue.put)
~/anaconda3/envs/testenv/lib/python3.7/site-packages/dask/multiprocessing.py in _dumps(x)
24
25 def _dumps(x):
---> 26 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
27
28
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
950 try:
951 cp = CloudPickler(file, protocol=protocol)
--> 952 cp.dump(obj)
953 return file.getvalue()
954 finally:
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
265 self.inject_addons()
266 try:
--> 267 return Pickler.dump(self, obj)
268 except RuntimeError as e:
269 if 'recursion' in e.args[0]:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in dump(self, obj)
435 if self.proto >= 4:
436 self.framer.start_framing()
--> 437 self.save(obj)
438 self.write(STOP)
439 self.framer.end_framing()
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
636 else:
637 save(func)
--> 638 save(args)
639 write(REDUCE)
640
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
784 write(MARK)
785 for element in obj:
--> 786 save(element)
787
788 if id(obj) in memo:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
885 k, v = tmp[0]
886 save(k)
--> 887 save(v)
888 write(SETITEM)
889 # else tmp is empty, and we're done
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
784 write(MARK)
785 for element in obj:
--> 786 save(element)
787
788 if id(obj) in memo:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_tuple(self, obj)
769 if n <= 3 and self.proto >= 2:
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
773 if id(obj) in memo:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
814
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
818 dispatch[list] = save_list
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
838 write(MARK)
839 for x in tmp:
--> 840 save(x)
841 write(APPENDS)
842 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_list(self, obj)
814
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
818 dispatch[list] = save_list
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_appends(self, items)
838 write(MARK)
839 for x in tmp:
--> 840 save(x)
841 write(APPENDS)
842 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
394 or themodule is None):
--> 395 self.save_function_tuple(obj)
396 return
397 else:
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
885 k, v = tmp[0]
886 save(k)
--> 887 save(v)
888 write(SETITEM)
889 # else tmp is empty, and we're done
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
394 or themodule is None):
--> 395 self.save_function_tuple(obj)
396 return
397 else:
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
393 or getattr(obj.__code__, 'co_filename', None) == '<stdin>'
394 or themodule is None):
--> 395 self.save_function_tuple(obj)
396 return
397 else:
~/anaconda3/envs/testenv/lib/python3.7/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
660
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
664
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
547
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
551 def persistent_id(self, obj):
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
660
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
664
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save_dict(self, obj)
854
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
858 dispatch[dict] = save_dict
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in _batch_setitems(self, items)
880 for k, v in tmp:
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
884 elif n:
~/anaconda3/envs/testenv/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
522 reduce = getattr(obj, "__reduce_ex__", None)
523 if reduce is not None:
--> 524 rv = reduce(self.proto)
525 else:
526 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle _thread._local objects
Your run
function probably references variables fro outside its scope, which are being captured into the closure. Be sure that any file handles or database connections get created inside the function
Bad:
conn = DBConn(...)
def run(row):
return conn.do_stuff(row)
Good:
def run(row):
conn = DBConn(...)
return conn.do_stuff(row)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With