I am using Spark MLlib for calculating the summation of all terms' TFIDF for each document( each document is described by a row of a dataframe), I wrote the following code:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.linalg import SparseVector
sc = SparkContext()
sqlContext = SQLContext(sc)
#SECTION 1
documents = sqlContext.createDataFrame([
(0, "hello spark", "data1"),
(1, "this is example", "data2"),
(2, "spark is fast","data3"),
(3, "hello world","data4")], ["doc_id", "doc_text", "another"])
#SECTION 2
documents.registerTempTable("doc_table")
textcolumn= sqlContext.sql("SELECT doc_text FROM doc_table")
doc_words= textcolumn.map(lambda d: d.doc_text).map(lambda t: t.split(" "))
#SECTION 3
hashingTF = HashingTF()
tf = hashingTF.transform(doc_words).cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf).cache()
#SECTION 4
sumrdd=tfidf.map(lambda v: v.values.sum())
print('\n Summation of TFIDF for each document:')
print(sumrdd.collect())
I got the following result:
[1.0216512475319814, 2.3434070875143007, 1.9379419794061366, 1.4271163556401458]
Anybody helps me to make some changes on the code in order to retain the original data (doc_id, doc_text, another) to be linked during the calculation process of tf,idf and tfidf because I have thousands of rows in the dataframe and I must be assured that each documents correctly connected to its sum TFIDF. Finally I would like to get a result (or a dataframe) like this:
(0, "hello spark", "data1", 1.0216512475319814)
(1, "this is example", "data2", 2.3434070875143007)
(2, "spark is fast","data3",1.9379419794061366)
(3, "hello world","data4",1.4271163556401458)
Solution 1 :
This is not the best solution ever but the main idea is actually to use spark-ml
to keep the information of your DataFrame
:
# I used alias to avoid confusion with the mllib library
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF
from pyspark.sql.types import DoubleType
documents = sqlContext.createDataFrame([
(0, "hello spark", "data1"),
(1, "this is example", "data2"),
(2, "spark is fast","data3"),
(3, "hello world","data4")], ["doc_id", "doc_text", "another"])
documents.printSchema()
# root
# |-- doc_id: long (nullable = true)
# |-- doc_text: string (nullable = true)
# |-- another: string (nullable = true)
df = (documents
.rdd
.map(lambda x : (x.doc_id,x.doc_text.split(" ")))
.toDF()
.withColumnRenamed("_1","doc_id")
.withColumnRenamed("_2","features"))
htf = MLHashingTF(inputCol="features", outputCol="tf")
tf = htf.transform(df)
tf.show(truncate=False)
# +------+-------------------+------------------------------------------+
# |doc_id|features |tf |
# +------+-------------------+------------------------------------------+
# |0 |[hello, spark] |(262144,[62173,71890],[1.0,1.0]) |
# |1 |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|
# |2 |[spark, is, fast] |(262144,[3370,62173,251996],[1.0,1.0,1.0])|
# |3 |[hello, world] |(262144,[71890,72594],[1.0,1.0]) |
# +------+-------------------+------------------------------------------+
idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf = idf.fit(tf).transform(tf)
tfidf.show(truncate=False)
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |doc_id|features |tf |idf |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
# |0 |[hello, spark] |(262144,[62173,71890],[1.0,1.0]) |(262144,[62173,71890],[0.5108256237659907,0.5108256237659907]) |
# |1 |[this, is, example]|(262144,[3370,69994,151198],[1.0,1.0,1.0])|(262144,[3370,69994,151198],[0.5108256237659907,0.9162907318741551,0.9162907318741551])|
# |2 |[spark, is, fast] |(262144,[3370,62173,251996],[1.0,1.0,1.0])|(262144,[3370,62173,251996],[0.5108256237659907,0.5108256237659907,0.9162907318741551])|
# |3 |[hello, world] |(262144,[71890,72594],[1.0,1.0]) |(262144,[71890,72594],[0.5108256237659907,0.9162907318741551]) |
# +------+-------------------+------------------------------------------+---------------------------------------------------------------------------------------+
res = tfidf.rdd.map(lambda x : (x.doc_id,x.features,x.tf,x.idf,(None if x.idf is None else x.idf.values.sum())))
for r in res.take(10):
print r
# (0, [u'hello', u'spark'], SparseVector(262144, {62173: 1.0, 71890: 1.0}), SparseVector(262144, {62173: 0.5108, 71890: 0.5108}), 1.0216512475319814)
# (1, [u'this', u'is', u'example'], SparseVector(262144, {3370: 1.0, 69994: 1.0, 151198: 1.0}), SparseVector(262144, {3370: 0.5108, 69994: 0.9163, 151198: 0.9163}), 2.3434070875143007)
# (2, [u'spark', u'is', u'fast'], SparseVector(262144, {3370: 1.0, 62173: 1.0, 251996: 1.0}), SparseVector(262144, {3370: 0.5108, 62173: 0.5108, 251996: 0.9163}), 1.9379419794061366)
# (3, [u'hello', u'world'], SparseVector(262144, {71890: 1.0, 72594: 1.0}), SparseVector(262144, {71890: 0.5108, 72594: 0.9163}), 1.4271163556401458)
Solution 2:
You might consider using UDF
:
from pyspark.sql.functions import udf
sum_ = udf(lambda v: float(v.values.sum()), DoubleType())
tfidf.withColumn("idf_sum", sum_("idf")).show()
## +------+-------------------+--------------------+--------------------+------------------+
## |doc_id| features| tf| idf| idf_sum|
## +------+-------------------+--------------------+--------------------+------------------+
## | 0| [hello, spark]|(262144,[62173,71...|(262144,[62173,71...|1.0216512475319814|
## | 1|[this, is, example]|(262144,[3370,699...|(262144,[3370,699...|2.3434070875143007|
## | 2| [spark, is, fast]|(262144,[3370,621...|(262144,[3370,621...|1.9379419794061366|
## | 3| [hello, world]|(262144,[71890,72...|(262144,[71890,72...|1.4271163556401458|
## +------+-------------------+--------------------+--------------------+------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With