pyLDAvis visualization of pyspark generated LDA model

Does anyone have an example of data visualization of an LDA model trained using the PySpark library (specifically using pyLDAvis)? I've seen a lot of examples for GenSim and other libraries but not PySpark. Specifically I'm wondering what to pass into the pyLDAvis.prepare() function and how to get it from my lda model. Here is my code:

 from pyspark.mllib.clustering import LDA, LDAModel
 from pyspark.mllib.feature import IDF
 from pyspark.ml.feature import CountVectorizer
 from pyspark.mllib.linalg import Vectors

 vectorizer = CountVectorizer(inputCol="filtered1", outputCol="features").fit(filtered_final)
 countVectors = vectorizer.transform(filtered_final).select("status_id", "features")
 frequencyVectors = countVectors.rdd.map(lambda vector: vector[1])
 frequencyDenseVectors = frequencyVectors.map(lambda vector: Vectors.dense(vector))
 idf = IDF().fit(frequencyDenseVectors)
 print('fitting complete')
 tfidf = idf.transform(frequencyDenseVectors)
 print("tf idf complete")
 #prepare corpus for LDA
 corpus = tfidf.map(lambda x: [1, x]).cache()
 #train LDA
 ldaModel = LDA.train(corpus, k = 15, maxIterations=100, optimizer="online", docConcentration=2.0, topicConcentration=3.0)
 print("lda model complete")
1 Answers

I have somehow managed to fit the output of pyspark to pyLDAvis.
The following code needs a little cleaning but it works.

from pyspark.ml.feature import StopWordsRemover,Tokenizer, RegexTokenizer, CountVectorizer, IDF
from pyspark.sql.functions import udf, col, size, explode, regexp_replace, trim, lower, lit
from pyspark.sql.types import ArrayType, StringType, DoubleType, IntegerType, LongType
from pyspark.ml.clustering import LDA
import pyLDAvis

def format_data_to_pyldavis(df_filtered, count_vectorizer, transformed, lda_model):
    xxx = df_filtered.select((explode(df_filtered.words_filtered)).alias("words")).groupby("words").count()
    word_counts = {r['words']:r['count'] for r in xxx.collect()}
    word_counts = [word_counts[w] for w in count_vectorizer.vocabulary]

    data = {'topic_term_dists': np.array(lda_model.topicsMatrix().toArray()).T, 
            'doc_topic_dists': np.array([x.toArray() for x in transformed.select(["topicDistribution"]).toPandas()['topicDistribution']]),
            'doc_lengths': [r[0] for r in df_filtered.select(size(df_filtered.words_filtered)).collect()],
            'vocab': count_vectorizer.vocabulary,
            'term_frequency': word_counts}

    return data

def filter_bad_docs(data):
    bad = 0
    doc_topic_dists_filtrado = []
    doc_lengths_filtrado = []

    for x,y in zip(data['doc_topic_dists'], data['doc_lengths']):
        if np.sum(x)==0:
        elif np.sum(x) != 1:
        elif np.isnan(x).any():

    data['doc_topic_dists'] = doc_topic_dists_filtrado
    data['doc_lengths'] = doc_lengths_filtrado

# This is the only part that you have to implement:
create a Spark Dataframe named df_filtered and it has the list of raw words.
It can be the output of StopWordsRemover

count_vectorizer = CountVectorizer(inputCol="words_filtered", outputCol="features", minDF=0.05, maxDF=0.5)
count_vectorizer = count_vectorizer.fit(df_filtered)
df_counted = count_vectorizer.transform(df_filtered)

idf = IDF(inputCol="features", outputCol="features_tfidf")
idf_model = idf.fit(df_counted)
df_tfidf = idf_model.transform(df_counted)

lda = LDA(k=2, maxIter=20, featuresCol='features_tfidf')
lda_model = lda.fit(df_tfidf)
transformed = lda_model.transform(df_tfidf)

data = format_data_to_pyldavis(df_filtered, count_vectorizer, transformed, lda_model)
filter_bad_docs(data) # this is, because for some reason some docs apears with 0 value in all the vectors, or the norm is not 1, so I filter those docs.
py_lda_prepared_data = pyLDAvis.prepare(**data)
