Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding the role of UID in a Spark MLLib Transformer

I'm using Apache Spark with Scala to create an ML Pipeline. One of the Transformers I have in my pipeline does a costly join operation early on in the process. Since I have a lot of features in my ParamGrid this means that the program has to hold this enormous, joined DataFrame in memory while it optimizes over each feature in the grid.

To try and solve this problem I created a custom Transformer that caches this large, intermediate DataFrame by writing it to a parquet in S3 and returning a DataFrame that is read from the parquet. This worked well and increased the speed of the model until I added features to the ParamGrid that are staged before the caching stage. When I write the parquet to S3 I use a path that is determined by:

class Cacher(override val uid: String) extends Transformer {

  // the cachePath variable determines the path within the S3 bucket
  lazy val cachePath = Identifiable.randomUID(uid + "transformer-cache")

  // ...  

I think I am misunderstanding how uid is working...my belief was that whenever Spark optimized over the ParamGrid, it took whatever classes were staged at that point in the pipeline, created new instances of them, and gave them new, unique uids to keep track of them. I suspect that the caching is going awry because Spark is not giving a unique uid to the new Transformer instances it creates which means that the cached parquet is being continually overwritten whenever a new instance of the cache Transformer is created. Can anybody give any pointers on how to generate unique, random uids for each instance of a stage that the pipeline creates?

Cheers!

like image 266
the-jackalope Avatar asked Nov 29 '16 20:11

the-jackalope


1 Answers

Step by step:

  • uid is required by the Identifiable trait (Transformer extends PipelineStage which extends Params which extends Identifiable).
  • According to the Identifiable docs uid is:

    An immutable unique ID for the object and its derivatives.

  • In general:

    • Params are mutable. Setting parameters returns this and doesn't affect uid.

      import org.apache.spark.ml.feature.OneHotEncoder
      
      val enc = new OneHotEncoder()
      val enc_  = enc.setInputCol("foo")
      
      enc == enc_
      
      Boolean = true
      
      enc.uid == enc_.uid
      
      Boolean = true
      
    • copying Params creates a new instance but keeps the same uid (see emphasized part of the quote from the previous point).

      val encCopy = enc.copy(new org.apache.spark.ml.param.ParamMap())
      
      encCopy == enc
      
      Boolean = false
      
      encCopy.uid == enc.uid
      
      Boolean = true
      
  • You could try to override copy method to avoid copying parent uid but it seems to be in conflict with the whole idea behind making Params Identifiable.

Possible solutions:

  • Don't use transformer uid at all or make path dependent on the current set of parameters.
  • Don't write cache files manually and use built-in caching mechanism (Dataset.persist). It not only resolves the issue at hand but also addresses a hidden issue which is releasing resources on exit.
like image 56
zero323 Avatar answered Nov 16 '22 05:11

zero323