Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark ML Pipeline with RandomForest takes too long on 20MB dataset

I am using Spark ML to run some ML experiments, and on a small dataset of 20MB (Poker dataset) and a Random Forest with parameter grid, it takes 1h and 30 minutes to finish. Similarly with scikit-learn it takes much much less.

In terms of environment, I was testing with 2 slaves, 15GB memory each, 24 cores. I assume it was not supposed to take that long and I am wondering if the problem lies within my code, since I am fairly new to Spark.

Here it is:

df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)

train, test = dataframe.randomSplit([0.7, 0.3])

columnTypes = dataframe.dtypes

for ct in columnTypes:
    if ct[1] == 'string' and ct[0] != 'label':
        categoricalCols += [ct[0]]
    elif ct[0] != 'label':
        numericCols += [ct[0]]

stages = []

for categoricalCol in categoricalCols:

    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")

stages += [stringIndexer]

assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')

stages += [labelIndexer]

estimator = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

stages += [estimator]

parameters = {"maxDepth" : [3, 5, 10, 15], "maxBins" : [6, 12, 24, 32], "numTrees" : [3, 5, 10]}

paramGrid = ParamGridBuilder()
for key, value in parameters.iteritems():
    paramGrid.addGrid(estimator.getParam(key), value)
estimatorParamMaps = (paramGrid.build())

pipeline = Pipeline(stages=stages)

crossValidator = CrossValidator(estimator=pipeline, estimatorParamMaps=estimatorParamMaps, evaluator=MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1'), numFolds=3)

pipelineModel = crossValidator.fit(train)

predictions = pipelineModel.transform(test)

evaluator = pipeline.getEvaluator().evaluate(predictions)

Thanks in advance, any comments/suggestions are highly appreciated :)

like image 307
Larissa Leite Avatar asked Jul 02 '17 00:07

Larissa Leite


People also ask

Is spark good for ML?

Spark enhances machine learning because data scientists can focus on the data problems they really care about while transparently leveraging the speed, ease, and integration of Spark's unified platform.

How do you do logistic regression in Pyspark?

Example of PySpark Logistic Regression Let's start by creating a simple data frame in PySpark on which we can use the model. We will be using the sc. Parallelize method to create the data frame followed by the toDF method in PySpark which contains the data needed for creating the Regression model.

What is Pyspark ML pipeline?

class pyspark.ml. Pipeline (*, stages: Optional[List[PipelineStage]] = None)[source] A simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer .


1 Answers

The following may not solve your problem completely but it should give you some pointer to start.

The first problem that you are facing is the disproportion between the amount of data and the resources.

This means that since you are parallelizing a local collection (pandas dataframe), Spark will use the default parallelism configuration. Which is most likely to be resulting in 48 partitions with less than 0.5mb per partition. (Spark doesn't do well with small files nor small partitions)

The second problem is related to expensive optimizations/approximations techniques used by Tree models in Spark.

Spark tree models use some tricks to optimally bucket continuous variables. With small data it is way cheaper to just get the exact splits. It mainly uses approximated quantiles in this case.

Usually, in a single machine framework scenario, like scikit, the tree model uses unique feature values for continuous features as splits candidates for the best fit calculation. Whereas in Apache Spark, the tree model uses quantiles for each feature as a split candidate.

Also to add that you shouldn't forget as well that cross validation is a heavy and long tasks as it's proportional to the combination of your 3 hyper-parameters times the number of folds times the time spent to train each model (GridSearch approach). You might want to cache your data per example for a start but it will still not gain you much time. I believe that spark is an overkill for this amount of data. You might want to use scikit learn instead and maybe use spark-sklearn to distributed local model training.

Spark will learn each model separately and sequentially with the hypothesis that data is distributed and big.

You can of course optimize performance using columnar data based file formats like parquet and tuning spark itself, etc. it's too broad to talk about it here.

You can read more about tree models scalability with spark-mllib in this following blogpost :

  • Scalable Decision Trees in MLlib
like image 81
eliasah Avatar answered Oct 24 '22 02:10

eliasah