UDF to map words to term Index in Spark

I am trying to get the correspond topic words for the term ID which I get from LDA model.

Here is the data frame of topics and it's word distribution from LDA in Spark

|topic|         termIndices|         termWeights|
|    0|[0, 39, 68, 43, 5...|[0.06362107696025...|
only showing top 1 row

Now since we have termIndices and not the actual words, I wanted to add another column to this data frame which would be the words for the corresponding termIndices.

Now since I ran the CountVectorizer in Spark, I use that model and get the list of words array like below.

# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)

cvModel.vocabulary gives the list of words.

So now here is a udf I wrote to get the mapping:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def term_to_words(termindices):
    """ To get the corresponding words from term indices


    return np.array(cvModel.vocabulary)[termindices]



The reason I converted the list to np array because in numpy array I can index by passing a lift of indices which one can't do that in a list.

But I get this error. Which I am not sure why is the case as I am hardly doing anything here.

Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)


So I thought of using mapper function instead of udf

def term_to_words(x):
    """ Mapper function to get the corresponding words for the term index



    return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])

/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    931         # SparkContext#runJob.
    932         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    934         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))

AttributeError: 'NoneType' object has no attribute 'sc'
There are two different problems here:

  • CountVectorizer is a wrapper for Java object. It cannot be serialized and passed with the closure. For the same reason you cannot use it in map closure.
  • You cannot return NumPy types from UDF.

You can for example:

from pyspark.sql.types import ArrayType, StringType

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))


    "topics_words", indices_to_terms(cvModel.vocabulary)("termIndices"))

If you want to use NumPy arrays you'll have use tolist() method before returning from the UDF.

