I have a fairly simple use case, but potentially very large result set. My code does the following (on pyspark shell):
from pyspark.mllib.fpm import FPGrowth
data = sc.textFile("/Users/me/associationtestproject/data/sourcedata.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.000001, numPartitions=1000)
# Perform any RDD operation
for item in model.freqItemsets().toLocalIterator():
# do something with item
I find that whenever I kick off the actual processing by calling either count() or toLocalIterator, my operation ultimately ends with out of memory error. Is FPGrowth not partitioning my data? Is my result data so big that getting even a single partition chokes up my memory? If yes, is there a way I can persist an RDD to disk in a "streaming" fashion without trying to hold it in memory?
Thanks for any insights.
Edit: A fundamental limitation of FPGrowth is that the entire FP Tree has to fit in memory. So, the suggestions about raising the minimum support threshold are valid.
-Raj
Well, the problem is most likely a support threshold. When you set a very low value like here (I wouldn't call one-in-a-million frequent) you basically throw away all the benefits of downward-closure property.
It means that number of itemsets consider is growing exponentially and in the worst case scenario it will be equal to 2N - 1m where N is a number of items. Unless you have a toy data with a very small number of items it is simply not feasible.
Edit:
Note that with ~200K transactions (information taken from the comments) and support threshold 1e-6 every itemset in your data has to be frequent. So basically what you're trying to do here is to enumerate all observed itemsets.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With