Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply function to grouped data frame in Dask: How do you specify the grouped Dataframe as argument in the function?

Tags:

I have a dask dataframe grouped by the index (first_name).

import pandas as pd
import numpy as np

from multiprocessing import cpu_count

from dask import dataframe as dd
from dask.multiprocessing import get 
from dask.distributed import Client


NCORES = cpu_count()
client = Client()

entities = pd.DataFrame({'first_name':['Jake','John','Danae','Beatriz', 'Jacke', 'Jon'],'last_name': ['Del Toro', 'Foster', 'Smith', 'Patterson', 'Toro', 'Froster'], 'ID':['X','U','X','Y', '12','13']})

df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index('first_name'))

(Obviously entities in the real life is several thousand rows)

I want to apply a user defined function to each grouped dataframe. I want to compare each row with all the other rows in the group (something similar to Pandas compare each row with all rows in data frame and save results in list for each row).

The following is the function that I try to apply:

def contraster(x, DF):
    matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis = 1) 
    return [i for i, x in enumerate(matches) if x]

For the test entities data frame, you could apply the function as usual:

entities.apply(lambda row: contraster(row['last_name'], entities), axis =1)

And the expected result is:

Out[35]: 
0    [0, 4]
1    [1, 5]
2       [2]
3       [3]
4    [0, 4]
5    [1, 5]
dtype: object

When entities is huge, the solution is use dask. Note that DF in the contraster function must be the groupped dataframe.

I am trying to use the following:

df.groupby('first_name').apply(func=contraster, args=????)

But How should I specify the grouped dataframe (i.e. DF in contraster?)

like image 678
nanounanue Avatar asked Mar 19 '18 06:03

nanounanue


People also ask

How do you apply on Dask?

You can use pandas' apply() function to apply any in-built or custom Python function across a pandas one-dimensional array, i.e., a Series or a single dimension of a DataFrame. This can be applied across columns (axis=0), or rows (axis=1). This computation takes ~10s because we have 10 rows.

How do I select a row in a Dask DataFrame?

Just like Pandas, Dask DataFrame supports label-based indexing with the . loc accessor for selecting rows or columns, and __getitem__ (square brackets) for selecting just columns. To select rows, the DataFrame's divisions must be known (see Internal Design and Dask DataFrames Best Practices for more information.)

What is Npartition in Dask?

The npartitions property is the number of Pandas dataframes that compose a single Dask dataframe. This affects performance in two main ways. If you don't have enough partitions then you may not be able to use all of your cores effectively. For example if your dask.


1 Answers

The function you provide to groupby-apply should take a Pandas dataframe or series as input and ideally return one (or a scalar value) as output. Extra parameters are fine, but they should be secondary, not the first argument. This is the same in both Pandas and Dask dataframe.

def func(df, x=None):
    # do whatever you want here
    # the input to this function will have all the same first name
    return pd.DataFrame({'x': [x] * len(df),
                         'count': len(df),
                         'first_name': df.first_name})

You can then call df.groupby as normal

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'first_name':['Alice', 'Alice', 'Bob'],
                   'last_name': ['Adams', 'Jones', 'Smith']})

ddf = dd.from_pandas(df, npartitions=2)

ddf.groupby('first_name').apply(func, x=3).compute()

This will produce the same output in either pandas or dask.dataframe

   count first_name  x
0      2      Alice  3
1      2      Alice  3
2      1        Bob  3
like image 78
MRocklin Avatar answered Oct 20 '22 11:10

MRocklin