Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: Applying a function from sklearn parallel on partitions

Tags:

apache-spark

I'm new to Big Data and Apache Spark (and an undergrad doing work under a supervisor).

Is it possible to apply a function (i.e. a spline) to only partitions of the RDD? I'm trying to implement some of the work in the paper here.

The book "Learning Spark" seems to indicate that this is possible, but doesn't explain how.

"If you instead have many small datasets on which you want to train different learning models, it would be better to use a single- node learning library (e.g., Weka or SciKit-Learn) on each node, perhaps calling it in parallel across nodes using a Spark map()."

like image 791
NormallySane Avatar asked Mar 19 '26 20:03

NormallySane


1 Answers

Actually, we have a library which does exactly that. We have several sklearn transformators and predictors up and running. It's name is sparkit-learn.
From our examples:

from splearn.rdd import DictRDD  
from splearn.feature_extraction.text import SparkHashingVectorizer  
from splearn.feature_extraction.text import SparkTfidfTransformer  
from splearn.svm import SparkLinearSVC  
from splearn.pipeline import SparkPipeline  

from sklearn.feature_extraction.text import HashingVectorizer  
from sklearn.feature_extraction.text import TfidfTransformer  
from sklearn.svm import LinearSVC  
from sklearn.pipeline import Pipeline  

X = [...]  # list of texts  
y = [...]  # list of labels  
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
            columns=('X', 'y'),
            dtype=[np.ndarray, np.ndarray])

local_pipeline = Pipeline((
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
    ('vect', SparkHashingVectorizer()),
    ('tfidf', SparkTfidfTransformer()),
    ('clf', SparkLinearSVC())
))

local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))

y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])

You can find it here.

like image 56
fulibacsi Avatar answered Mar 21 '26 10:03

fulibacsi