Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dask prints warning to use client.scatter althought I'm using the suggested approach

In dask distributed I get the following warning, which I would not expect:

/home/miniconda3/lib/python3.6/site-packages/distributed/worker.py:739: UserWarning: Large object of size 1.95 MB detected in task graph: 
  (['int-58e78e1b34eb49a68c65b54815d1b158', 'int-5cd ... 161071d7ae7'],)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s))

The reason I'm suprised is, that I'm doing exactly what the warning is suggesting:

import dask.dataframe as dd
import pandas
from dask.distributed import Client, LocalCluster

c = Client(LocalCluster())
dask_df = dd.from_pandas(pandas.DataFrame.from_dict({'A':[1,2,3,4,5]*1000}), npartitions=10)
filter_list = c.scatter(list(range(2,100000,2)))
mask = c.submit(dask_df['A'].isin, filter_list)
dask_df[mask.result()].compute()

So my question is: Am I doing something wrong or is this a bug?

pandas='0.22.0'
dask='0.17.0'
like image 620
dennis-w Avatar asked Feb 22 '18 14:02

dennis-w


People also ask

How do I stop DASK client?

When we create a Client object it registers itself as the default Dask scheduler. All . compute() methods will automatically start using the distributed system. We can stop this behavior by using the set_as_default=False keyword argument when starting the Client.

What is Dask futures?

Dask supports a real-time task framework that extends Python's concurrent. futures interface. Dask futures reimplements most of the Python futures API, allowing you to scale your Python futures workflow across a Dask cluster with minimal code changes.

What does DASK client do?

The Client connects users to a Dask cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in concurrent. futures but also allows Future objects within submit/map calls.

Is DASK asynchronous?

Dask can run fully asynchronously and so interoperate with other highly concurrent applications.


1 Answers

The main reason why dask is complaining isn't the list, it's the pandas dataframe inside the dask dataframe.

dask_df = dd.from_pandas(pandas.DataFrame.from_dict({'A':[1,2,3,4,5]*1000}), npartitions=10)

You are creating a biggish amount of data locally when you create a pandas dataframe in your local session. Then you operate with it on the cluster. This will require moving your pandas dataframe to the cluster.

You're welcome to ignore these warnings, but in general I would not be surprised if performance here is worse than with pandas alone.

There are a few other things going on here. Your scatter of a list produces a bunch of futures, which may not be what you want. You're calling submit on a dask object, which is usually unnecessary.

like image 195
MRocklin Avatar answered Oct 25 '22 01:10

MRocklin