Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply a custom function to a spark dataframe group

I have a very big table of time series data that have these columns:

  • Timestamp
  • LicensePlate
  • UberRide#
  • Speed

Each collection of LicensePlate/UberRide data should be processed considering the whole set of data. In others words, I do not need to proccess the data row by row, but all rows grouped by (LicensePlate/UberRide) together.

I am planning to use spark with dataframe api, but I am confused on how can I perform a custom calculation over spark grouped dataframe.

What I need to do is:

  1. Get all data
  2. Group by some columns
  3. Foreach spark dataframe group apply a f(x). Return a custom object foreach group
  4. Get the results by applying g(x) and returning a single custom object

How can I do steps 3 and 4? Any hints over which spark API (dataframe, dataset, rdd, maybe pandas...) should I use?

The whole workflow can be seen below:

Workflow

like image 832
guilhermecgs Avatar asked Sep 20 '16 17:09

guilhermecgs


People also ask

How do I use custom function on groupBy pandas?

Simply use the apply method to each dataframe in the groupby object. This is the most straightforward way and the easiest to understand. Notice that the function takes a dataframe as its only argument, so any code within the custom function needs to work on a pandas dataframe.

How do you apply a function to a Column in Pyspark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

What does take () do in Spark?

In Spark, the take function behaves like an array. It receives an integer value (let say, n) as a parameter and returns an array of first n elements of the dataset.


2 Answers

  • While Spark provides some ways to integrate with Pandas it doesn't make Pandas distributed. So whatever you do with Pandas in Spark is simply local (either to driver or executor when used inside transformations) operation.

    If you're looking for a distributed system with Pandas-like API you should take a look at dask.

  • You can define User Defined Aggregate functions or Aggregators to process grouped Datasets but this part of the API is directly accessible only in Scala. It is not that hard to write a Python wrapper when you create one.
  • RDD API provides a number of functions which can be used to perform operations in groups starting with low level repartition / repartitionAndSortWithinPartitions and ending with a number of *byKey methods (combineByKey, groupByKey, reduceByKey, etc.).

    Which one is applicable in your case depends on the properties of the function you want to apply (is it associative and commutative, can it work on streams, does it expect specific order).

    The most general but inefficient approach can be summarized as follows:

    h(rdd.keyBy(f).groupByKey().mapValues(g).collect())
    

    where f maps from value to key, g corresponds to per-group aggregation and h is a final merge. Most of the time you can do much better than that so it should be used only as the last resort.

  • Relatively complex logic can be expressed using DataFrames / Spark SQL and window functions.

  • See also Applying UDFs on GroupedData in PySpark (with functioning python example)

like image 126
zero323 Avatar answered Sep 22 '22 11:09

zero323


What you are looking for exists since Spark 2.3: Pandas vectorized UDFs. It allows to group a DataFrame and apply custom transformations with pandas, distributed on each group:

df.groupBy("groupColumn").apply(myCustomPandasTransformation)

It is very easy to use so I will just put a link to Databricks' presentation of pandas UDF.

However, I don't know such a practical way to make grouped transformations in Scala yet, so any additional advice is welcome.

EDIT: in Scala, you can achieve the same thing since earlier versions of Spark, using Dataset's groupByKey + mapGroups/flatMapGroups.

like image 26
Florent F Avatar answered Sep 21 '22 11:09

Florent F