I am using Spark ML to run some ML experiments, and on a small dataset of 20MB (Poker dataset) and a Random Forest with parameter grid, it takes 1h and 30 minutes to finish. Similarly with scikit-learn it takes much much less.
In terms of environment, I was testing with 2 slaves, 15GB memory each, 24 cores. I assume it was not supposed to take that long and I am wondering if the problem lies within my code, since I am fairly new to Spark.
Here it is:
df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)
train, test = dataframe.randomSplit([0.7, 0.3])
columnTypes = dataframe.dtypes
for ct in columnTypes:
if ct[1] == 'string' and ct[0] != 'label':
categoricalCols += [ct[0]]
elif ct[0] != 'label':
numericCols += [ct[0]]
stages = []
for categoricalCol in categoricalCols:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
stages += [stringIndexer]
assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')
stages += [labelIndexer]
estimator = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")
stages += [estimator]
parameters = {"maxDepth" : [3, 5, 10, 15], "maxBins" : [6, 12, 24, 32], "numTrees" : [3, 5, 10]}
paramGrid = ParamGridBuilder()
for key, value in parameters.iteritems():
paramGrid.addGrid(estimator.getParam(key), value)
estimatorParamMaps = (paramGrid.build())
pipeline = Pipeline(stages=stages)
crossValidator = CrossValidator(estimator=pipeline, estimatorParamMaps=estimatorParamMaps, evaluator=MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1'), numFolds=3)
pipelineModel = crossValidator.fit(train)
predictions = pipelineModel.transform(test)
evaluator = pipeline.getEvaluator().evaluate(predictions)
Thanks in advance, any comments/suggestions are highly appreciated :)
Spark enhances machine learning because data scientists can focus on the data problems they really care about while transparently leveraging the speed, ease, and integration of Spark's unified platform.
Example of PySpark Logistic Regression Let's start by creating a simple data frame in PySpark on which we can use the model. We will be using the sc. Parallelize method to create the data frame followed by the toDF method in PySpark which contains the data needed for creating the Regression model.
class pyspark.ml. Pipeline (*, stages: Optional[List[PipelineStage]] = None)[source] A simple pipeline, which acts as an estimator. A Pipeline consists of a sequence of stages, each of which is either an Estimator or a Transformer .
The following may not solve your problem completely but it should give you some pointer to start.
The first problem that you are facing is the disproportion between the amount of data and the resources.
This means that since you are parallelizing a local collection (pandas dataframe), Spark will use the default parallelism configuration. Which is most likely to be resulting in 48
partitions with less than 0.5mb
per partition. (Spark doesn't do well with small files nor small partitions)
The second problem is related to expensive optimizations/approximations techniques used by Tree models in Spark.
Spark tree models use some tricks to optimally bucket continuous variables. With small data it is way cheaper to just get the exact splits. It mainly uses approximated quantiles in this case.
Usually, in a single machine framework scenario, like scikit
, the tree model uses unique feature values for continuous features as splits candidates for the best fit calculation. Whereas in Apache Spark, the tree model uses quantiles for each feature as a split candidate.
Also to add that you shouldn't forget as well that cross validation is a heavy and long tasks as it's proportional to the combination of your 3 hyper-parameters times the number of folds times the time spent to train each model (GridSearch approach). You might want to cache your data per example for a start but it will still not gain you much time. I believe that spark is an overkill for this amount of data. You might want to use scikit learn instead and maybe use spark-sklearn to distributed local model training.
Spark will learn each model separately and sequentially with the hypothesis that data is distributed and big.
You can of course optimize performance using columnar data based file formats like parquet and tuning spark itself, etc. it's too broad to talk about it here.
You can read more about tree models scalability with spark-mllib in this following blogpost :
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With