I'm trying to compute pointwise mutual information (PMI).
I have two RDDs as defined here for p(x, y) and p(x) respectively:
pii: RDD[((String, String), Double)]
pi: RDD[(String, Double)]
Any code I'm writing to compute PMI from the RDDs pii
and pi
is not pretty. My approach is first to flatten the RDD pii
and join with pi
twice while massaging the tuple elements.
val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
.join(pi).values
.map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
.join(pi).values
.map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
// handle boundary conditions, etc
log(pab) - log(pa) - log(pb)
}
Clearly, this sucks. Is there a better (idiomatic) way to do this?
Note: I could optimize the logs by storing the log-probs in pi
and pii
but choosing to write this way to keep the question clear.
The general formula for all versions of pointwise mutual information is given below; it is the binary logarithm of the joint probability of X = a and Y = b, divided by the product of the individual probabilities that X = a and Y = b.
A PMI(x,y) = 0 means that the particular values of x and y are statistically independent; positive PMI means they co-occur more frequently than would be expected under an independence assumption, and negative PMI means they cooccur less frequently than would be expected.
What is Pointwise mutual information? PMI helps us to find related words. In other words, it explains how likely the co-occurrence of two words than we would expect by chance. For example the word "Data Science" has a specific meaning when these two words "Data" and "Science" go together.
Normalized pointwise mutual information (npmi)Pointwise mutual information can be normalized between [-1,+1] resulting in -1 (in the limit) for never occurring together, 0 for independence, and +1 for complete co-occurrence.
Using broadcast
would be a solution.
val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
case ((x, y), pxy) =>
(x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}
Assume: pi
has all x
and y
in pii
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With