Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Non linear (DAG) ML pipelines in Apache Spark

I've set-up a simple Spark-ML app, where I have a pipeline of independent transformers that add columns to a dataframe of raw data. Since the transformers don't look at the output of one another I was hoping I could run them in parallel in a non-linear (DAG) pipeline. All I could find about this feature is this paragraph from the Spark ML-Guide:

It is possible to create non-linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the Pipeline forms a DAG, then the stages must be specified in topological order.

My understanding of the paragraph is that if I set the inputCol(s), outputCol parameters for each transformer and specify the stages in topological order when I create the pipeline, then the engine will use that information to build an execution DAG s.t. the stages of the DAG could run once their input is ready.

Some questions about that:

  • Is my understanding correct?
  • What happens if for one of the stages/transformers I don't specify an output column (e.g. the stage only filters some of the lines)?. Will it assume that for DAG creation purposes the stage is changing all columns so all subsequent stages should be waiting for it?
  • Likewise, what happens if for one of the stages I don't specify an inputCol(s)? Will the stage wait until all previous stages are complete?
  • It seems I can specify multiple input columns but only one output column. What happens if a transformer adds two columns to a dataframe (Spark itself has no problem with that)? Is there some way to let the DAG creation engine know about it?
like image 682
hillel Avatar asked May 31 '16 09:05

hillel


1 Answers

Is my understanding correct?

Not exactly. Because stages are provided in a topological order all you have to do to traverse the graph in the correct order is to apply PipelineStages from left to right. And this exactly what happens when you call PipelineTransform.

Sequence of stages is traversed twice:

  • once to validate schema using transformSchema which is simply implemented as stages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur)). This is the part where actual schema validation is performed.
  • once to fit actually transform data using Transformers and fit Estimators. This is just a simple for loop which applies stages sequentially one by one.

Likewise, what happens if for one of the stages I don't specify an inputCol(s)?

Pretty much nothing interesting. Since stages are applied sequentially, and the only schema validation is applied by the given Transformer using its transformSchema method before actual transformations begin, it will processed as any other stage.

What happens if a transformer adds two columns to a dataframe

Same as above. As long as it generates valid input schema for subsequent stages it is not different than any other Transformer.

transformers don't look at the output of one another I was hoping I could run them in parallel

Theoretically you could try to build a custom composite transformer which encapsulates multiple different transformations but the only part that could be performed independently and benefit from this type of operation is model fitting. At the end of the day you have to return a single transformed DataFrame which can be utilized by downstream stages and actual transformations are most likely scheduled as a single data scan anyway.

Question remains if it is really worth the effort. While it possible to execute multiple jobs at the same time, it provides some edge only, if amount of available resources is relatively high compared to amount of work required to handle a single job. It usually requires some low level management (number of partitions, number of shuffle partitions) which is not the strongest suit of Spark SQL.

like image 182
zero323 Avatar answered Oct 10 '22 13:10

zero323