Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I actually get dask to compute a list of delayed or dask-container-based results?

Tags:

python

dask

I have a trivially parallelizable task of computing results independently for many tables split across many files. I can construct delayed or dask.dataframe lists (and have also tried with, e.g. a dict), and I cannot get all of the results to compute (I can get individual results from a dask graph style dictionary using .get(), but again can't compute all results easily). Here's a minimal example:

>>> df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)
>>> numbers = [df['a'].mean() for _ in range(2)]
>>> dd.compute(numbers)
([<dask.dataframe.core.Scalar at 0x7f91d1523978>,
  <dask.dataframe.core.Scalar at 0x7f91d1523a58>],)

Similarly:

>>> from dask import delayed
>>> @delayed
... def mean(data):
...     sum(data) / len(data)
>>> delayed_numbers = [mean([1,2]) for _ in range(2)]
>>> dask.compute(delayed_numbers)
([Delayed('mean-0e0a0dea-fa92-470d-b06e-b639fbaacae3'),
  Delayed('mean-89f2e361-03b6-4279-bef7-572ceac76324')],)

I would like to get [3, 3], which is what I would expect based on the delayed collections docs.

For my real problem, I would actually like to compute on tables in an HDF5 file, but given that I can get that to work with dask.get() I'm pretty sure I'm specifying my deferred / dask dataframe step right already.

I would be interested in a solution that directly results in a dictionary, but I can also just return a list of (key, value) tuples to dict(), which is probably not a huge performance hit.

like image 917
Dav Clark Avatar asked May 24 '16 00:05

Dav Clark


People also ask

How does Dask delayed work?

The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph. Wraps a function or object to produce a Delayed .

How does Dask delayed help in data processing?

Instead of executing a function for each item in the loop in a sequential manner, Dask Delayed allows multiple items to be processed in parallel. With Dask Delayed each function call is queued, added to an execution graph and scheduled.

What does Dask compute return?

Verify that Dask DataFrame compute() returns a pandas DataFrame. Dask DataFrames are composed of many underlying pandas DataFrames, each of which is called a partition.


1 Answers

Compute takes many collections as separate arguments. Try splatting out your arguments as follows:

In [1]: import dask.dataframe as dd

In [2]: import pandas as pd

In [3]: df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)

In [4]: numbers = [df['a'].mean() for _ in range(2)]

In [5]: dd.compute(*numbers)  # note the *
Out[5]: (1.5, 1.5)

Or, as might be more common:

In [6]: dd.compute(df.a.mean(), df.a.std())
Out[6]: (1.5, 0.707107)
like image 85
MRocklin Avatar answered Oct 16 '22 08:10

MRocklin