Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple aggregation user defined functions in Dask dataframe

I'm processing a data set using Dask (considering it doesn't fit in memory) and I want to group the instances with a different aggregating function depending on the column and it's type.

Dask has a set of default aggregation functions for numerical data types, but not for strings/objects. Is there a way to implement a user defined aggregation function for strings somewhat similar to the example below?

atts_to_group = {'A', 'B'}
agg_fn = {
  'C': 'mean'  #int
  'D': 'concatenate_fn1'  #string - No default fn for strings - Doesn't work
  'E': 'concatenate_fn2'  #string
}
ddf = ddf.groupby(atts_to_group).agg(agg_fn).compute().reset_index()

At this point I'm able to read the whole data set in memory upon dropping irrelevant columns/rows, but I'd prefer continuing the processing in Dask considering it's faster performing the required operations.

Edit: Tried adding a custom function directly onto the dictionary:

def custom_concat(df):
    ...
    return df_concatd

agg_fn = {
  'C': 'mean'  #int
  'D': custom_concat(df)
}

-------------------------------------------------------
ValueError: unknown aggregate Dask DataFrame Structure:
like image 567
GRoutar Avatar asked Sep 03 '18 12:09

GRoutar


People also ask

Is Dask merge faster than pandas?

Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .

How do I partition a Dask DataFrame?

This allows partitionwise slicing of a Dask Dataframe. You can perform normal Numpy-style slicing but now rather than slice elements of the array you slice along partitions so, for example, df. partitions[:5] produces a new Dask Dataframe of the first five partitions.

What is Groupby AGG?

Function to use for aggregating the data. If a function, must either work when passed a DataFrame or when passed to DataFrame. apply. For a DataFrame, can pass a dict, if the keys are DataFrame column names.

How do you use Groupby and aggregate?

The Group By statement is used to group together any rows of a column with the same value stored in them, based on a function specified in the statement. Generally, these functions are one of the aggregate functions such as MAX() and SUM(). This statement is used with the SELECT command in SQL.


1 Answers

Realised Dask provides with an Aggregation data structure. The custom aggregation can be done as follows:

# Concatenates the strings and separates them using ","
custom_concat = dd.Aggregation('custom_sum', lambda x: ",".join(str(x)), lambda x0: ",".join(str(x0)))
custom_concat_E = ...

atts_to_group = {'A', 'B'}
agg_fn = {
  'C': 'mean'  #int
  'D': custom_concat_D
  'E': custom_concat_E
}
ddf = ddf.groupby(atts_to_group).agg(agg_fn).compute().reset_index()

This can also be done with Dataframe.apply for a less verbose solution

def agg_fn(x):
    return pd.Series(
        dict(
            C = x['C'].mean(), # int
            D = "{%s}" % ', '.join(x['D']), # string (concat strings)
            E = ...
        )
    )

ddf = ddf.groupby(atts_to_group).apply(agg_fn).compute().reset_index
like image 136
GRoutar Avatar answered Sep 19 '22 02:09

GRoutar