Using map over map partitions can give significant performance boost in cases where the transformation incurs creating or loading an expensive resource (e.g - authenticate to an external service or create a db connection).
mapPartition allows us to initialise the expensive resource once per partition verses once per row as happens with the standard map.
But if I am using dataframes, the way I apply custom transformations is by specifying user defined functions that operate on a row by row basis- so I lose the ability I had with mapPartitions to perform heavy lifting once per chunk.
Is there a workaround for this in spark-sql/dataframe?
To be more specific:
I need to perform feature extraction on a bunch of documents. I have a function that inputs a document and outputs a vector.
The computation itself involves initialising a connection to an external service. I don't want or need to initialise it per document. This has non trivial overhead at scale.
In general you have three options:
DataFrame
to RDD
and apply mapPartitions
directly. Since you use Python udf
you already break certain optimizations and pay serde cost and using RDD
won't make it worse on average.pandas_udf
(Spark 2.3 and later). Unfortunately you cannot use it directly with VectorUDT
, so you'd have to expand vectors and collapse later, so the limiting factor here is the size of the vector. Also you have to be careful to keep size of partitions under control.Note that using UserDefinedFunctions
might require promoting objects to non-deterministic variants.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With