Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to apply a function to a dask dataframe and return multiple values?

In pandas, I use the typical pattern below to apply a vectorized function to a df and return multiple values. This is really only necessary when the said function produces multiple independent outputs from a single task. See my overly trivial example:

import pandas as pd
df = pd.DataFrame({'val1': [1, 2, 3, 4, 5],
                   'val2': [1, 2, 3, 4, 5]})

def myfunc(in1, in2):
    out1 = in1 + in2
    out2 = in1 * in2
    return (out1, out2)

df['out1'], df['out2'] = zip(*df.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1))

Currently I write a separate function to chunk the pandas df and using multiprocessing for efficiency gains, but I would like to use dask to accomplish this task instead. Continuing the example, here is how I would run a vectorized function to return a single value when using dask:

import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=2)

def simple_func(in1, in2):
    out1 = in1 + in2
    return out1

df['out3'] = ddf.map_partitions(lambda x: simple_func(x['val1'], x['val2']), meta=(None, 'i8')).compute()

Now I would like to use dask and return two values as in the pandas example. I have tried to add a list to meta and return a tuple but just get errors. Is this possible in dask and how?

like image 742
Jasper1918 Avatar asked Jan 18 '17 20:01

Jasper1918


2 Answers

I think the problem here stems from the way you are combining your results is not great. Ideally you would use df.apply with the result_expand argument and then use df.merge. Porting this code from Pandas to Dask is trivial. For pandas this would be:

Pandas

import pandas as pd

def return_two_things(x, y):
    return (
        x + y,
        x * y,
    )

def pandas_wrapper(row):
    return return_two_things(row['val1'], row['val2'])

df = pd.DataFrame({
    'val1': range(1, 6),
    'val2': range(1, 6),
})

res = df.apply(pandas_wrapper, axis=1, result_type='expand')
res.columns = ['out1', 'out2']
full = df.merge(res, left_index=True, right_index=True)
print(full)

Which outputs:

   val1  val2  out1  out2
0     1     1     2     1
1     2     2     4     4
2     3     3     6     9
3     4     4     8    16
4     5     5    10    25

Dask

For Dask, applying the function to the data and collating the results is virtually identical:

import dask.dataframe as dd

ddf = dd.from_pandas(df, npartitions=2)
# here 0 and 1 refer to the default column names of the resulting dataframe
res = ddf.apply(pandas_wrapper, axis=1, result_type='expand', meta={0: int, 1: int})
# which are renamed out1, and out2 here
res.columns = ['out1', 'out2']
# this merge is considered "embarrassingly parallel", as a worker does not need to contact 
# any other workers when it is merging the results (that it created) with the input data it used.
full = ddf.merge(res, left_index=True, right_index=True)

print(full.compute())

Outputing:

   val1  val2  out1  out2
0     1     1     2     1
1     2     2     4     4
2     3     3     6     9
3     4     4     8    16
4     5     5    10    25
like image 130
Dunes Avatar answered Oct 04 '22 10:10

Dunes


Late to the party. Perhaps this was not possible back when the question was asked.

I don't like the ending assignment pattern. As far as I am able to find dask does not allow new column assignment like pandas does.

You need to set the meta value to the basic type you are returning. You can return a dict, tuple, set, or list quite simply from my testing. The meta doesn't actually seem to care if the type matches the type of the return object anyway.

import pandas
import dask.dataframe

def myfunc(in1, in2):
    out1 = in1 + in2
    out2 = in1 * in2
    return (out1, out2)

df = pandas.DataFrame({'val1': [1, 2, 3, 4, 5],
                   'val2': [1, 2, 3, 4, 5]})
ddf = dask.dataframe.from_pandas(df, npartitions=2)

df['out1'], df['out2'] = zip(*df.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1))


output = ddf.map_partitions(lambda part: part.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1), meta=tuple).compute()

out1, out2 = zip(*output)

ddf = ddf.assign(out1 = pandas.Series(out1))
ddf = ddf.assign(out2 = pandas.Series(out2))

print('\nPandas\n',df)
print('\nDask\n',ddf.compute())
print('\nEqual\n',ddf.eq(df).compute().all())

outputs:

Pandas
    val1  val2  out1  out2
0     1     1     2     1
1     2     2     4     4
2     3     3     6     9
3     4     4     8    16
4     5     5    10    25

Dask
    val1  val2  out1  out2
0     1     1     2     1
1     2     2     4     4
2     3     3     6     9
3     4     4     8    16
4     5     5    10    25

Equal
val1    True
val2    True
out1    True
out2    True
dtype: bool

It helps to note that map_partition's lambda return is a partition of the larger dataframe (based, in this case, on your npartitions value). Which you would then treat like any other dataframe with your .apply().

like image 21
Kenneth D Avatar answered Oct 04 '22 08:10

Kenneth D