I'm using Apache Spark with Scala to create an ML Pipeline. One of the Transformers I have in my pipeline does a costly join
operation early on in the process. Since I have a lot of features in my ParamGrid
this means that the program has to hold this enormous, joined DataFrame
in memory while it optimizes over each feature in the grid.
To try and solve this problem I created a custom Transformer
that caches this large, intermediate DataFrame
by writing it to a parquet in S3 and returning a DataFrame that is read from the parquet. This worked well and increased the speed of the model until I added features to the ParamGrid
that are staged before the caching stage. When I write the parquet to S3 I use a path that is determined by:
class Cacher(override val uid: String) extends Transformer {
// the cachePath variable determines the path within the S3 bucket
lazy val cachePath = Identifiable.randomUID(uid + "transformer-cache")
// ...
I think I am misunderstanding how uid
is working...my belief was that whenever Spark optimized over the ParamGrid
, it took whatever classes were staged at that point in the pipeline, created new instances of them, and gave them new, unique uid
s to keep track of them. I suspect that the caching is going awry because Spark is not giving a unique uid
to the new Transformer
instances it creates which means that the cached parquet is being continually overwritten whenever a new instance of the cache Transformer
is created. Can anybody give any pointers on how to generate unique, random uid
s for each instance of a stage that the pipeline creates?
Cheers!
Step by step:
uid
is required by the Identifiable
trait (Transformer
extends PipelineStage
which extends Params
which extends Identifiable
).According to the Identifiable
docs uid
is:
An immutable unique ID for the object and its derivatives.
In general:
Params
are mutable. Setting parameters returns this
and doesn't affect uid
.
import org.apache.spark.ml.feature.OneHotEncoder
val enc = new OneHotEncoder()
val enc_ = enc.setInputCol("foo")
enc == enc_
Boolean = true
enc.uid == enc_.uid
Boolean = true
copying
Params
creates a new instance but keeps the same uid
(see emphasized part of the quote from the previous point).
val encCopy = enc.copy(new org.apache.spark.ml.param.ParamMap())
encCopy == enc
Boolean = false
encCopy.uid == enc.uid
Boolean = true
You could try to override copy
method to avoid copying parent uid
but it seems to be in conflict with the whole idea behind making Params
Identifiable
.
Possible solutions:
uid
at all or make path dependent on the current set of parameters.Dataset.persist
). It not only resolves the issue at hand but also addresses a hidden issue which is releasing resources on exit.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With