Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Optimal way to create a ml pipeline in Apache Spark for dataset with high number of columns

I am working with Spark 2.1.1 on a dataset with ~2000 features and trying to create a basic ML Pipeline, consisting of some Transformers and a Classifier.

Let's assume for the sake of simplicity that the Pipeline I am working with consists of a VectorAssembler, StringIndexer and a Classifier, which would be a fairly common usecase.

// Pipeline elements val assmbleFeatures: VectorAssembler = new VectorAssembler()   .setInputCols(featureColumns)   .setOutputCol("featuresRaw")  val labelIndexer: StringIndexer = new StringIndexer()   .setInputCol("TARGET")   .setOutputCol("indexedLabel")  // Train a RandomForest model. val rf: RandomForestClassifier = new RandomForestClassifier()   .setLabelCol("indexedLabel")   .setFeaturesCol("featuresRaw")   .setMaxBins(30)  // add the params, unique to this classifier val paramGrid = new ParamGridBuilder()   .addGrid(rf.numTrees, Array(5))   .addGrid(rf.maxDepth, Array(5))   .build()  // Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages. val evaluator = new BinaryClassificationEvaluator()   .setMetricName("areaUnderROC")   .setLabelCol("indexedLabel") 

If the pipeline steps are separated into a transformer pipeline (VectorAssembler + StringIndexer) and a second classifier pipeline, and if the unnecessary columns are dropped in between both pipelines, training succeeds. This means for reusing the models, two PipelineModels have to be saved after training and an intermediary preprocessing step has to be introduced.

// Split indexers and forest in two Pipelines. val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain) // Transform data and drop all columns, except those needed for training  val dfTrainT = prePipeline.transform(dfTrain) val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col)) val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)  val mainPipeline = new Pipeline().setStages(Array(rf))  val cv = new CrossValidator()   .setEstimator(mainPipeline)   .setEvaluator(evaluator)   .setEstimatorParamMaps(paramGrid)   .setNumFolds(2)  val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel] 

The (imho) much cleaner solution would be to merge all pipeline stages into one pipeline.

val pipeline = new Pipeline()   .setStages(Array(labelIndexer, assmbleFeatures, rf))  val cv = new CrossValidator()   .setEstimator(pipeline)   .setEvaluator(evaluator)   .setEstimatorParamMaps(paramGrid)   .setNumFolds(2)  // This will fail!  val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel] 

However, putting all PipelineStages into one Pipeline leads to the following exception, probably due to the issue this PR will eventually solve:

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

The reason for this is that the VectorAssembler effectively doubles (in this example) the amount of data in the DataFrame, as there is no transformer that could drop the unnecessary columns. (See spark pipeline vector assembler drop other columns)

To the example works on the golub dataset and the following preprocessing steps are necessary:

import org.apache.spark.sql.types.DoubleType import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature._ import org.apache.spark.sql._ import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}  val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)  // Those steps are necessary, otherwise training would fail either way val colsToDrop = df.columns.take(5000) val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)  // Split df in train and test sets val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))  // Feature columns are columns except "TARGET" val featureColumns = dfTrain.columns.filter(col => col != "TARGET") 

As I am new to Spark, I am not sure what would be the best way to solve this issue. Would you suggest...

  1. to create a new transformer, which drops columns and that can be incorporated into the pipeline?
  2. split both Pipelines and introduce the intermediary step
  3. anything else? :)

Or am I missing anything important (pipeline steps, PR, etc.) that would solve this issue?


Edit:

I implemented a new Transformer DroppingVectorAssembler, which drops unnecessary columns, however, the same exception is thrown.

Besides that, setting spark.sql.codegen.wholeStage to false does not solve the issue.

like image 976
aMKa Avatar asked May 11 '17 09:05

aMKa


People also ask

Which provides higher level API built on top of DataFrames for constructing ML pipelines?

spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.

Why is Spark well suited for machine learning on large datasets?

Spark enhances machine learning because data scientists can focus on the data problems they really care about while transparently leveraging the speed, ease, and integration of Spark's unified platform.

How many rows of data can Spark handle?

Let's do a quick strength testing of PySpark before moving forward so as not to face issues with increasing data size, On first testing, PySpark can perform joins and aggregation of 1.5Bn rows i.e ~1TB data in 38secs and 130Bn rows i.e ~60 TB data in 21 Mins.

What is wrong in working of ML pipeline?

Challenges Associated with ML PipelinesData cleaning. Feature extraction (labelling and dimensionality reduction) Model validation. Visualisation.


1 Answers

The janino error is due the number of constant variables created during the optimizer process. The maximum limit of constant variables allowed in the JVM is ((2^16) -1). If this limit is exceeded, then you get the Constant pool for class ... has grown past JVM limit of 0xFFFF

The JIRA that will fix this issue is SPARK-18016, but it's still in progress at this time.

Your code is most likely failing during the VectorAssembler stage, when it has to perform against thousands of columns during a single optimization task.

The workaround that I developed for this problem is to create a "vector of vectors" by working against subsets of the columns and then bringing the results together at the end to create a singular feature vector. This prevents any single optimization task from exceeding the JVM constant limit. It's not elegant, but I've used it on datasets reaching into the 10k columns range.

This method also allows you to still keep a single pipeline, though it requires some additional steps to make it work (creating the sub-vectors). After you've created the feature vector from the sub-vectors, you can drop the original source columns if desired.

Example Code:

// IMPORT DEPENDENCIES import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column} import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.{Pipeline, PipelineModel}  // Create first example dataframe val exampleDF = spark.createDataFrame(Seq(   (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),   (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),   (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),   (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),   (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),   (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4) )).toDF("uid", "col1", "col2", "col3", "col4", "col5",          "col6", "col7", "col8", "col9", "colA", "colB",          "colC", "colD", "colE", "colF", "colG", "colH",          "colI", "colJ", "colK")  // Create multiple column lists using the sliding method val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray  // Create a vector assembler for each column list val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec") val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec") val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec") val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")  // Create a vector assembler using column list vectors as input val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")  // Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))  // Fit and transform the data val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)  // Get the number of features in "features" vector val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))  // Print number of features in "features vector" print(featureLength) 

(Note: The method of creating the column lists should really be done programatically, but I've kept this example simple for the sake of understanding the concept.)

like image 76
JamCon Avatar answered Sep 17 '22 08:09

JamCon