Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UDF to map words to term Index in Spark

I am trying to get the correspond topic words for the term ID which I get from LDA model.

Here is the data frame of topics and it's word distribution from LDA in Spark

topics_desc=ldaModel.describeTopics(20)
topics_desc.show(1)
+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[0, 39, 68, 43, 5...|[0.06362107696025...|
+-----+--------------------+--------------------+
only showing top 1 row

Now since we have termIndices and not the actual words, I wanted to add another column to this data frame which would be the words for the corresponding termIndices.

Now since I ran the CountVectorizer in Spark, I use that model and get the list of words array like below.

# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)
cvModel=cv.fit(swremoved_df)

cvModel.vocabulary gives the list of words.

So now here is a udf I wrote to get the mapping:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def term_to_words(termindices):
    """ To get the corresponding words from term indices

    """


    return np.array(cvModel.vocabulary)[termindices]

term_to_words_conv=udf(term_to_words)


topics=topics_desc.withColumn("topics_words",term_to_words_conv("termIndices"))

The reason I converted the list to np array because in numpy array I can index by passing a lift of indices which one can't do that in a list.

But I get this error. Which I am not sure why is the case as I am hardly doing anything here.

Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

Edit:

So I thought of using mapper function instead of udf

def term_to_words(x):
    """ Mapper function to get the corresponding words for the term index

    """

    row=x.asDict()
    word_list=np.array(cvModel.vocabulary)

    return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])


topics_rdd=topics_desc.rdd.map(term_to_words)
/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    931         # SparkContext#runJob.
    932         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    934         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    935 

AttributeError: 'NoneType' object has no attribute 'sc'
like image 652
Baktaawar Avatar asked Feb 16 '17 21:02

Baktaawar


1 Answers

There are two different problems here:

  • CountVectorizer is a wrapper for Java object. It cannot be serialized and passed with the closure. For the same reason you cannot use it in map closure.
  • You cannot return NumPy types from UDF.

You can for example:

from pyspark.sql.types import ArrayType, StringType

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))

Usage:

topics_desc.withColumn(
    "topics_words", indices_to_terms(cvModel.vocabulary)("termIndices"))

If you want to use NumPy arrays you'll have use tolist() method before returning from the UDF.

like image 117
zero323 Avatar answered Sep 20 '22 20:09

zero323