Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stateful udfs in spark sql, or how to obtain mapPartitions performance benefit in spark sql?

Using map over map partitions can give significant performance boost in cases where the transformation incurs creating or loading an expensive resource (e.g - authenticate to an external service or create a db connection).

mapPartition allows us to initialise the expensive resource once per partition verses once per row as happens with the standard map.

But if I am using dataframes, the way I apply custom transformations is by specifying user defined functions that operate on a row by row basis- so I lose the ability I had with mapPartitions to perform heavy lifting once per chunk.

Is there a workaround for this in spark-sql/dataframe?

To be more specific:

I need to perform feature extraction on a bunch of documents. I have a function that inputs a document and outputs a vector.

The computation itself involves initialising a connection to an external service. I don't want or need to initialise it per document. This has non trivial overhead at scale.

like image 368
Vitaliy Avatar asked Mar 29 '18 14:03

Vitaliy


1 Answers

In general you have three options:

  • Convert DataFrame to RDD and apply mapPartitions directly. Since you use Python udf you already break certain optimizations and pay serde cost and using RDD won't make it worse on average.
  • Lazily initialize required resources (see also How to run a function on all Spark workers before processing data in PySpark?).
  • If data can be serialized with Arrow use vectorized pandas_udf (Spark 2.3 and later). Unfortunately you cannot use it directly with VectorUDT, so you'd have to expand vectors and collapse later, so the limiting factor here is the size of the vector. Also you have to be careful to keep size of partitions under control.

Note that using UserDefinedFunctions might require promoting objects to non-deterministic variants.

like image 176
zero323 Avatar answered Jan 03 '23 03:01

zero323