I am trying to get the correspond topic words for the term ID which I get from LDA model.
Here is the data frame of topics and it's word distribution from LDA in Spark
topics_desc=ldaModel.describeTopics(20)
topics_desc.show(1)
+-----+--------------------+--------------------+
|topic| termIndices| termWeights|
+-----+--------------------+--------------------+
| 0|[0, 39, 68, 43, 5...|[0.06362107696025...|
+-----+--------------------+--------------------+
only showing top 1 row
Now since we have termIndices and not the actual words, I wanted to add another column to this data frame which would be the words for the corresponding termIndices.
Now since I ran the CountVectorizer
in Spark, I use that model and get the list of words array like below.
# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)
cvModel=cv.fit(swremoved_df)
cvModel.vocabulary
gives the list of words.
So now here is a udf I wrote to get the mapping:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
def term_to_words(termindices):
""" To get the corresponding words from term indices
"""
return np.array(cvModel.vocabulary)[termindices]
term_to_words_conv=udf(term_to_words)
topics=topics_desc.withColumn("topics_words",term_to_words_conv("termIndices"))
The reason I converted the list to np array because in numpy array I can index by passing a lift of indices which one can't do that in a list.
But I get this error. Which I am not sure why is the case as I am hardly doing anything here.
Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Edit:
So I thought of using mapper function instead of udf
def term_to_words(x):
""" Mapper function to get the corresponding words for the term index
"""
row=x.asDict()
word_list=np.array(cvModel.vocabulary)
return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])
topics_rdd=topics_desc.rdd.map(term_to_words)
/Users/spark2/python/pyspark/context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
931 # SparkContext#runJob.
932 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 933 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
934 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
935
AttributeError: 'NoneType' object has no attribute 'sc'
There are two different problems here:
CountVectorizer
is a wrapper for Java object. It cannot be serialized and passed with the closure. For the same reason you cannot use it in map
closure.You can for example:
from pyspark.sql.types import ArrayType, StringType
def indices_to_terms(vocabulary):
def indices_to_terms(xs):
return [vocabulary[int(x)] for x in xs]
return udf(indices_to_terms, ArrayType(StringType()))
Usage:
topics_desc.withColumn(
"topics_words", indices_to_terms(cvModel.vocabulary)("termIndices"))
If you want to use NumPy arrays you'll have use tolist()
method before returning from the UDF.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With