Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark reversing StringIndexer in nested array

I'm using PySpark to do collaborative filtering using ALS. My original user and item id's are strings, so I used StringIndexer to convert them to numeric indices (PySpark's ALS model obliges us to do so).

After I've fitted the model, I can get the top 3 recommendations for each user like so:

recs = (
    model
    .recommendForAllUsers(3)
)

The recs dataframe looks like so:

+-----------+--------------------+
|userIdIndex|     recommendations|
+-----------+--------------------+
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows

root
 |-- userIdIndex: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productIdIndex: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

I want to create a huge JSOM dump with this dataframe, and I can like so:

(
    recs
    .toJSON()
    .saveAsTextFile("name_i_must_hide.recs")
)

and a sample of these jsons is:

{
  "userIdIndex": 1580,
  "recommendations": [
    {
      "productIdIndex": 10096,
      "rating": 3.6725707
    },
    {
      "productIdIndex": 10141,
      "rating": 3.61542
    },
    {
      "productIdIndex": 11591,
      "rating": 3.536216
    }
  ]
}

The userIdIndex and productIdIndex keys are due to the StringIndexer transformation.

How can I get the original value of these columns back? I suspect I must use the IndexToString transformer, but I can't quite figure out how since the data is nested in an array inside the recs Dataframe.

I tried to use a Pipeline evaluator (stages=[StringIndexer, ALS, IndexToString]) but it looks like this evaluator doesn't support these indexers.

Cheers!

like image 943
Daniel Severo Avatar asked Oct 17 '22 07:10

Daniel Severo


1 Answers

In both cases you'll need an access to the list of labels. This can be accessed using either a StringIndexerModel

user_indexer_model = ...  # type: StringIndexerModel
user_labels = user_indexer_model.labels

product_indexer_model = ...  # type: StringIndexerModel
product_labels = product_indexer_model.labels

or column metadata.

For userIdIndex you can just apply IndexToString:

from pyspark.ml.feature import IndexToString

user_id_to_label = IndexToString(
    inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)

For recommendations you'll need either udf or expression like this:

from pyspark.sql.functions import array, col, lit, struct

n = 3  # Same as numItems

product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
    product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
    col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])

recs.withColumn("recommendations", recommendations)
like image 80
zero323 Avatar answered Oct 21 '22 01:10

zero323