In pandas, I use the typical pattern below to apply a vectorized function to a df and return multiple values. This is really only necessary when the said function produces multiple independent outputs from a single task. See my overly trivial example:
import pandas as pd
df = pd.DataFrame({'val1': [1, 2, 3, 4, 5],
'val2': [1, 2, 3, 4, 5]})
def myfunc(in1, in2):
out1 = in1 + in2
out2 = in1 * in2
return (out1, out2)
df['out1'], df['out2'] = zip(*df.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1))
Currently I write a separate function to chunk the pandas df and using multiprocessing for efficiency gains, but I would like to use dask to accomplish this task instead. Continuing the example, here is how I would run a vectorized function to return a single value when using dask:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=2)
def simple_func(in1, in2):
out1 = in1 + in2
return out1
df['out3'] = ddf.map_partitions(lambda x: simple_func(x['val1'], x['val2']), meta=(None, 'i8')).compute()
Now I would like to use dask and return two values as in the pandas example. I have tried to add a list to meta and return a tuple but just get errors. Is this possible in dask and how?
I think the problem here stems from the way you are combining your results is not great. Ideally you would use df.apply
with the result_expand
argument and then use df.merge
. Porting this code from Pandas to Dask is trivial. For pandas this would be:
import pandas as pd
def return_two_things(x, y):
return (
x + y,
x * y,
)
def pandas_wrapper(row):
return return_two_things(row['val1'], row['val2'])
df = pd.DataFrame({
'val1': range(1, 6),
'val2': range(1, 6),
})
res = df.apply(pandas_wrapper, axis=1, result_type='expand')
res.columns = ['out1', 'out2']
full = df.merge(res, left_index=True, right_index=True)
print(full)
Which outputs:
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
For Dask, applying the function to the data and collating the results is virtually identical:
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=2)
# here 0 and 1 refer to the default column names of the resulting dataframe
res = ddf.apply(pandas_wrapper, axis=1, result_type='expand', meta={0: int, 1: int})
# which are renamed out1, and out2 here
res.columns = ['out1', 'out2']
# this merge is considered "embarrassingly parallel", as a worker does not need to contact
# any other workers when it is merging the results (that it created) with the input data it used.
full = ddf.merge(res, left_index=True, right_index=True)
print(full.compute())
Outputing:
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Late to the party. Perhaps this was not possible back when the question was asked.
I don't like the ending assignment pattern. As far as I am able to find dask does not allow new column assignment like pandas does.
You need to set the meta value to the basic type you are returning. You can return a dict, tuple, set, or list quite simply from my testing. The meta doesn't actually seem to care if the type matches the type of the return object anyway.
import pandas
import dask.dataframe
def myfunc(in1, in2):
out1 = in1 + in2
out2 = in1 * in2
return (out1, out2)
df = pandas.DataFrame({'val1': [1, 2, 3, 4, 5],
'val2': [1, 2, 3, 4, 5]})
ddf = dask.dataframe.from_pandas(df, npartitions=2)
df['out1'], df['out2'] = zip(*df.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1))
output = ddf.map_partitions(lambda part: part.apply(lambda x: myfunc(x['val1'], x['val2']), axis=1), meta=tuple).compute()
out1, out2 = zip(*output)
ddf = ddf.assign(out1 = pandas.Series(out1))
ddf = ddf.assign(out2 = pandas.Series(out2))
print('\nPandas\n',df)
print('\nDask\n',ddf.compute())
print('\nEqual\n',ddf.eq(df).compute().all())
outputs:
Pandas
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Dask
val1 val2 out1 out2
0 1 1 2 1
1 2 2 4 4
2 3 3 6 9
3 4 4 8 16
4 5 5 10 25
Equal
val1 True
val2 True
out1 True
out2 True
dtype: bool
It helps to note that map_partition's lambda return is a partition of the larger dataframe (based, in this case, on your npartitions value). Which you would then treat like any other dataframe with your .apply().
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With