Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding the resulting TFIDF calculation to the dataframe of the original documents in Pyspark

I am using Spark MLlib for calculating the summation of all terms' TFIDF for each document( each document is described by a row of a dataframe), I wrote the following code:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.linalg import SparseVector

sc = SparkContext() 
sqlContext = SQLContext(sc)

#SECTION 1
documents = sqlContext.createDataFrame([
    (0, "hello spark", "data1"),
    (1, "this is example", "data2"),
    (2, "spark is fast","data3"),
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"])

#SECTION 2
documents.registerTempTable("doc_table")
textcolumn= sqlContext.sql("SELECT doc_text FROM doc_table")
doc_words= textcolumn.map(lambda d: d.doc_text).map(lambda t: t.split(" "))

#SECTION 3
hashingTF = HashingTF()
tf = hashingTF.transform(doc_words).cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf).cache()

#SECTION 4
sumrdd=tfidf.map(lambda v: v.values.sum())
print('\n Summation of TFIDF for each document:')
print(sumrdd.collect())

I got the following result:

[1.0216512475319814, 2.3434070875143007, 1.9379419794061366, 1.4271163556401458]

Anybody helps me to make some changes on the code in order to retain the original data (doc_id, doc_text, another) to be linked during the calculation process of tf,idf and tfidf because I have thousands of rows in the dataframe and I must be assured that each documents correctly connected to its sum TFIDF. Finally I would like to get a result (or a dataframe) like this:

(0, "hello spark", "data1", 1.0216512475319814)
(1, "this is example", "data2", 2.3434070875143007)
(2, "spark is fast","data3",1.9379419794061366)
(3, "hello world","data4",1.4271163556401458)
like image 879
K.Ali Avatar asked Mar 03 '16 10:03

K.Ali


1 Answers

Solution 1 :

This is not the best solution ever but the main idea is actually to use spark-ml to keep the information of your DataFrame :

# I used alias to avoid confusion with the mllib library
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.sql.types import DoubleType

documents = sqlContext.createDataFrame([
    (0, "hello spark", "data1"),
    (1, "this is example", "data2"),
    (2, "spark is fast","data3"),
    (3, "hello world","data4")], ["doc_id", "doc_text", "another"])

documents.printSchema()
# root
# |-- doc_id: long (nullable = true)
# |-- doc_text: string (nullable = true)
# |-- another: string (nullable = true)

df = (documents
  .rdd
  .map(lambda x : (x.doc_id,x.doc_text.split(" ")))
  .toDF()
  .withColumnRenamed("_1","doc_id")
  .withColumnRenamed("_2","features"))

htf = MLHashingTF(inputCol="features", outputCol="tf")
tf = htf.transform(df)
tf.show(truncate=False)
# +------+-------------------+------------------------------------------+
# |doc_id|features           |tf                                        |
# +------+-------------------+------------------------------------------+
# |0     |[hello, spark]     |(262144,[62173,71890],[1.0,1.0])          |
# |1     |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|
# |2     |[spark, is, fast]  |(262144,[3370,62173,251996],[1.0,1.0,1.0])|
# |3     |[hello, world]     |(262144,[71890,72594],[1.0,1.0])          |
# +------+-------------------+------------------------------------------+

idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf = idf.fit(tf).transform(tf)
tfidf.show(truncate=False)
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |doc_id|features           |tf                                        |idf                                                                                    |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |0     |[hello, spark]     |(262144,[62173,71890],[1.0,1.0])          |(262144,[62173,71890],[0.5108256237659907,0.5108256237659907])                         |
# |1     |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|(262144,[3370,69994,151198],[0.5108256237659907,0.9162907318741551,0.9162907318741551])|
# |2     |[spark, is, fast]  |(262144,[3370,62173,251996],[1.0,1.0,1.0])|(262144,[3370,62173,251996],[0.5108256237659907,0.5108256237659907,0.9162907318741551])|
# |3     |[hello, world]     |(262144,[71890,72594],[1.0,1.0])          |(262144,[71890,72594],[0.5108256237659907,0.9162907318741551])                         |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+

res = tfidf.rdd.map(lambda x : (x.doc_id,x.features,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum())))

for r in res.take(10):
    print r

# (0, [u'hello', u'spark'], SparseVector(262144, {62173: 1.0, 71890: 1.0}), SparseVector(262144, {62173: 0.5108, 71890: 0.5108}), 1.0216512475319814)
# (1, [u'this', u'is', u'example'], SparseVector(262144, {3370: 1.0, 69994: 1.0, 151198: 1.0}), SparseVector(262144, {3370: 0.5108, 69994: 0.9163, 151198: 0.9163}), 2.3434070875143007)
# (2, [u'spark', u'is', u'fast'], SparseVector(262144, {3370: 1.0, 62173: 1.0, 251996: 1.0}), SparseVector(262144, {3370: 0.5108, 62173: 0.5108, 251996: 0.9163}), 1.9379419794061366)
# (3, [u'hello', u'world'], SparseVector(262144, {71890: 1.0, 72594: 1.0}), SparseVector(262144, {71890: 0.5108, 72594: 0.9163}), 1.4271163556401458)

Solution 2:

You might consider using UDF:

from pyspark.sql.functions import udf

sum_ = udf(lambda v: float(v.values.sum()), DoubleType())
tfidf.withColumn("idf_sum", sum_("idf")).show()

## +------+-------------------+--------------------+--------------------+------------------+
## |doc_id|           features|                  tf|                 idf|           idf_sum|
## +------+-------------------+--------------------+--------------------+------------------+
## |     0|     [hello, spark]|(262144,[62173,71...|(262144,[62173,71...|1.0216512475319814|
## |     1|[this, is, example]|(262144,[3370,699...|(262144,[3370,699...|2.3434070875143007|
## |     2|  [spark, is, fast]|(262144,[3370,621...|(262144,[3370,621...|1.9379419794061366|
## |     3|     [hello, world]|(262144,[71890,72...|(262144,[71890,72...|1.4271163556401458|
## +------+-------------------+--------------------+--------------------+------------------+
like image 120
eliasah Avatar answered Oct 22 '22 00:10

eliasah