I have a trivially parallelizable task of computing results independently for many tables split across many files. I can construct delayed or dask.dataframe lists (and have also tried with, e.g. a dict), and I cannot get all of the results to compute (I can get individual results from a dask graph style dictionary using .get()
, but again can't compute all results easily). Here's a minimal example:
>>> df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)
>>> numbers = [df['a'].mean() for _ in range(2)]
>>> dd.compute(numbers)
([<dask.dataframe.core.Scalar at 0x7f91d1523978>,
<dask.dataframe.core.Scalar at 0x7f91d1523a58>],)
Similarly:
>>> from dask import delayed
>>> @delayed
... def mean(data):
... sum(data) / len(data)
>>> delayed_numbers = [mean([1,2]) for _ in range(2)]
>>> dask.compute(delayed_numbers)
([Delayed('mean-0e0a0dea-fa92-470d-b06e-b639fbaacae3'),
Delayed('mean-89f2e361-03b6-4279-bef7-572ceac76324')],)
I would like to get [3, 3], which is what I would expect based on the delayed collections docs.
For my real problem, I would actually like to compute on tables in an HDF5 file, but given that I can get that to work with dask.get()
I'm pretty sure I'm specifying my deferred / dask dataframe step right already.
I would be interested in a solution that directly results in a dictionary, but I can also just return a list of (key, value) tuples to dict()
, which is probably not a huge performance hit.
The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph. Wraps a function or object to produce a Delayed .
Instead of executing a function for each item in the loop in a sequential manner, Dask Delayed allows multiple items to be processed in parallel. With Dask Delayed each function call is queued, added to an execution graph and scheduled.
Verify that Dask DataFrame compute() returns a pandas DataFrame. Dask DataFrames are composed of many underlying pandas DataFrames, each of which is called a partition.
Compute takes many collections as separate arguments. Try splatting out your arguments as follows:
In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)
In [4]: numbers = [df['a'].mean() for _ in range(2)]
In [5]: dd.compute(*numbers) # note the *
Out[5]: (1.5, 1.5)
Or, as might be more common:
In [6]: dd.compute(df.a.mean(), df.a.std())
Out[6]: (1.5, 0.707107)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With