Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: How to evaluate AUC of ML recomendation algorithm?

I have a Spark Dataframe as below:

predictions.show(5)
+------+----+------+-----------+
|  user|item|rating| prediction|
+------+----+------+-----------+
|379433|  31|     1| 0.08203495|
|  1834|  31|     1|  0.4854447|
|422635|  31|     1|0.017672742|
|   839|  31|     1| 0.39273006|
| 51444|  31|     1| 0.09795039|
+------+----+------+-----------+
only showing top 5 rows

The prediction is the predicted ratings and rating is the implicit rating (count).

Now I want to check the AUC of my recommendation algorithm.

I first tried the pyspark.ml.BinaryClassificationEvaluator since that works directly on the data frame.

# getting the evaluationa metric 

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print evaluator.evaluate(predictions)

This gives me the following error:

---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-65-c642ea9c2cf5> in <module>()
      4 
      5 evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
----> 6 print evaluator.evaluate(predictions)
      7 
      8 #print evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

/Users/i854319/spark/python/pyspark/ml/evaluation.py in evaluate(self, dataset, params)
     67                 return self.copy(params)._evaluate(dataset)
     68             else:
---> 69                 return self._evaluate(dataset)
     70         else:
     71             raise ValueError("Params must be a param map but got %s." % type(params))

/Users/i854319/spark/python/pyspark/ml/evaluation.py in _evaluate(self, dataset)
     97         """
     98         self._transfer_params_to_java()
---> 99         return self._java_obj.evaluate(dataset._jdf)
    100 
    101     def isLargerBetter(self):

/Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     51                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     52             if s.startswith('java.lang.IllegalArgumentException: '):
---> 53                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     54             raise
     55     return deco

IllegalArgumentException: u'requirement failed: Column prediction must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually FloatType.'

So then I tried the pyspark.mllib.evaluation method BinaryClassificationMetrics

For that I needed an RDD of (scores,label). So from the same predictions data frame I mapped the last two columns as list of tuples. Used the following mapper function

### Creating an RDD of Scores and Prediction values from Validation dataset

def getScoresnLabels(x):
    """ This function takes the valdiation or test dataset and maps the raw and actual scores 
    together as one RDD
    
    """
    
    data_row=x.asDict()
    ret_tuple=(data_row['prediction'],data_row['rating'])
    return ret_tuple


scoresnLabels=predictions.map(getScoresnLabels)

Looks as below:

scoresnLabels.take(5)
Out[81]:
[(0.08203495293855667, 1),
 (0.48544469475746155, 1),
 (0.017672741785645485, 1),
 (0.39273005723953247, 1),
 (0.09795039147138596, 1)]

Then I use that in the following Evaluator.

### Using the mllib evaluation metric 

from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics=BinaryClassificationMetrics(scoresnLabels)
metrics.areaUnderROC

But I get the following error:

Exception AttributeError: "'BinaryClassificationMetrics' object has no attribute '_sc'" in <bound method BinaryClassificationMetrics.__del__ of <pyspark.mllib.evaluation.BinaryClassificationMetrics object at 0x126483d50>> ignored
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-82-81c08d4e6f1d> in <module>()
      3 from pyspark.mllib.evaluation import BinaryClassificationMetrics
      4 metrics=BinaryClassificationMetrics(scoresnLabels)
----> 5 metrics.areaUnderROC

/Users/i854319/spark/python/pyspark/mllib/evaluation.py in areaUnderROC(self)
     60         (ROC) curve.
     61         """
---> 62         return self.call("areaUnderROC")
     63 
     64     @property

/Users/i854319/spark/python/pyspark/mllib/common.pyc in call(self, name, *a)
    144     def call(self, name, *a):
    145         """Call method of java_model"""
--> 146         return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
    147 
    148 

/Users/i854319/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o562.areaUnderROC.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1505.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1505.0 (TID 9224, localhost): java.lang.NullPointerException: Value at index 1 in null
    at org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:475)
    at org.apache.spark.sql.Row$class.getDouble(Row.scala:243)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getDouble(rows.scala:192)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
    at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264)
    at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:126)
    at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:62)
    at org.apache.spark.rdd.OrderedRDDFunctions$$anonfun$sortByKey$1.apply(OrderedRDDFunctions.scala:61)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.OrderedRDDFunctions.sortByKey(OrderedRDDFunctions.scala:61)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4$lzycompute(BinaryClassificationMetrics.scala:153)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$4(BinaryClassificationMetrics.scala:144)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:146)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:146)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:222)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:85)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:96)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Value at index 1 in null
    at org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:475)
    at org.apache.spark.sql.Row$class.getDouble(Row.scala:243)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getDouble(rows.scala:192)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61)
    at org.apache.spark.mllib.evaluation.BinaryClassificationMetrics$$anonfun$$init$$1.apply(BinaryClassificationMetrics.scala:61)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Am very confused now. My questions are:

  1. First, why two ML packages in Spark and which one is the right one to use? Both have different syntax too and there is no consistency here in calling the methods like what Sci-kit learn packages have.

  2. Secondly, why I am getting error if I try with both the packages?

EDIT:

Code ALS to get predictions

from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
als = ALS(rank=120, maxIter=15, regParam=0.01, implicitPrefs=True)
model = als.fit(train)

predictions=model.transform(validation)
like image 251
Baktaawar Avatar asked Nov 01 '16 18:11

Baktaawar


1 Answers

2) Secondly, why I am getting error if I try with both the packages.***

Well, many Spark versions later...

...and with Spark 2.4.5 your first example still throws an error, but it is now a little more helpful:

pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Column prediction must be of type equal to one of the following types: [double, struct<type:tinyint,size:int,indices:array<int>,values:array<double>>] but was actually of type float.'

Converting the prediction column to one of the accepted types works e.g.

predictions = spark.createDataFrame(predictions.select('rating', 'prediction').rdd, schema=StructType(
    [StructField('rating', LongType()), StructField('prediction', DoubleType())]))
like image 72
Chris Avatar answered Oct 20 '22 12:10

Chris