Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Join two Spark mllib pipelines together

I have two separate DataFrames which each have several differing processing stages which I use mllib transformers in a pipeline to handle.

I now want to join these two pipelines together, keeping the features (columns) from each DataFrame.

Scikit-learn has the FeatureUnion class for handling this, and I can't seem to find an equivalent for mllib.

I can add a custom transformer stage at the end of one pipeline that takes the DataFrame produced by the other pipeline as an attribute and join it in the transform method, but that seems messy.

like image 767
Anake Avatar asked Mar 08 '23 20:03

Anake


1 Answers

Pipeline or PipelineModel are valid PipelineStages, and as such can be combined in a single Pipeline. For example with:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

you can combine Pipelines:

Pipeline(stages=[
    pipeline1, pipeline2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label|x1 |x2 |x3 |x4 |features1|features2|features         |
+-----+---+---+---+---+---------+---------+-----------------+
|1.0  |0  |1  |1  |0  |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|0.0  |1  |0  |0  |1  |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+

or pre-fitted PipelineModels:

model1 = pipeline1.fit(df)
model2 = pipeline2.fit(df)

Pipeline(stages=[
    model1, model2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label| x1| x2| x3| x4|features1|features2|         features|
+-----+---+---+---+---+---------+---------+-----------------+
|  1.0|  0|  1|  1|  0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|  0.0|  1|  0|  0|  1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+

So the approach I would recommend is to join data beforehand, and fit and transform a whole DataFrame.

See also:

  • Apack Spark add new fitted stage to a exitsting PipelineModel without fitting again
like image 153
zero323 Avatar answered Mar 12 '23 05:03

zero323