In dask distributed I get the following warning, which I would not expect:
/home/miniconda3/lib/python3.6/site-packages/distributed/worker.py:739: UserWarning: Large object of size 1.95 MB detected in task graph:
(['int-58e78e1b34eb49a68c65b54815d1b158', 'int-5cd ... 161071d7ae7'],)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
The reason I'm suprised is, that I'm doing exactly what the warning is suggesting:
import dask.dataframe as dd
import pandas
from dask.distributed import Client, LocalCluster
c = Client(LocalCluster())
dask_df = dd.from_pandas(pandas.DataFrame.from_dict({'A':[1,2,3,4,5]*1000}), npartitions=10)
filter_list = c.scatter(list(range(2,100000,2)))
mask = c.submit(dask_df['A'].isin, filter_list)
dask_df[mask.result()].compute()
So my question is: Am I doing something wrong or is this a bug?
pandas='0.22.0'
dask='0.17.0'
When we create a Client object it registers itself as the default Dask scheduler. All . compute() methods will automatically start using the distributed system. We can stop this behavior by using the set_as_default=False keyword argument when starting the Client.
Dask supports a real-time task framework that extends Python's concurrent. futures interface. Dask futures reimplements most of the Python futures API, allowing you to scale your Python futures workflow across a Dask cluster with minimal code changes.
The Client connects users to a Dask cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in concurrent. futures but also allows Future objects within submit/map calls.
Dask can run fully asynchronously and so interoperate with other highly concurrent applications.
The main reason why dask is complaining isn't the list, it's the pandas dataframe inside the dask dataframe.
dask_df = dd.from_pandas(pandas.DataFrame.from_dict({'A':[1,2,3,4,5]*1000}), npartitions=10)
You are creating a biggish amount of data locally when you create a pandas dataframe in your local session. Then you operate with it on the cluster. This will require moving your pandas dataframe to the cluster.
You're welcome to ignore these warnings, but in general I would not be surprised if performance here is worse than with pandas alone.
There are a few other things going on here. Your scatter of a list produces a bunch of futures, which may not be what you want. You're calling submit on a dask object, which is usually unnecessary.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With