I have a spark-DataFrame with two columns, ID and Items. Items is a list of strings.
My goal is to find frequent subsets of the itemsets in my data. I am familiar with apriori and fp-growth and used the latter to find frequent subsets. But I have one more requirement, in cases where multiple itemsets along a path in the fp-tree have the same support, I only want to keep the largest itemset. In my example, ["a"] has the same support as ["a", "b"]. Thus I am only interested in keeping the latter.
Even that is in principal not a problem, simplest solution is grouping itemsets by support, selecting the one with the most items in it and delete all subsets of that set. My problem is that the number of possible combinations is extremly large. I have 60 different items and itemsets of up to 50 items.
So my question is, is there anyway to make this more efficient?
Here is a basic example of what I am initially doing:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, udf
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
basic_data = {100: [['a', 'b']],
101: [['a', 'b', 'c']],
102: [['a', 'b', 'c', 'd']],
103: [['a', 'b', 'c']],
104: [['a', 'b']],
105: [['a', 'b']],
106: [['c', 'e']],
107: [['c', 'e']],
108: [[ 'c', 'e']],
}
data = [(key, value[0]) for key, value in basic_data.items()]
schema = ['id', 'items']
df = spark.createDataFrame(data, schema=schema)
fpGrowth = FPGrowth(itemsCol = 'items', minSupport=0.3)
model = fpGrowth.fit(df)
# Display frequent itemsets
frequentItemsets = model.freqItemsets
frequentItemsets.show()
Which gives me this result:
+---------+----+
| items|freq|
+---------+----+
| [a]| 6|
| [b]| 6|
| [b, a]| 6|
| [c]| 6|
| [c, b]| 3|
|[c, b, a]| 3|
| [c, a]| 3|
| [e]| 3|
| [e, c]| 3|
+---------+----+
I then group the data by frequency:
df = frequentItemsets.groupBy('freq').agg(F.collect_list('items').alias('items_list'))
df.show(truncate=False)
+----+----------------------------------------+
|freq|items_list |
+----+----------------------------------------+
|6 |[[b], [b, a], [a], [c]] |
|3 |[[c, b], [c, b, a], [c, a], [e], [e, c]]|
+----+----------------------------------------+
Next for each frequency I remove all itemsets that are subsets of another itemset with the same frequency. So for instance I remove [b] and [a] from the first row, because they are both subsets of [a,b]. My code for this is not the best, but I hope you get the idea:
def getlongestitem(itemlist:list)->int:
longestlen = 0
longestitem_index = -1
for i in range(len(itemlist)):
if len(itemlist[i]) > longestlen:
longestlen = len(itemlist[i])
longestitem_index = i
return longestitem_index
def find_subsets(itemlist:list, index:int)->list:
'''
Find all subsets of itemlist that are subsets of itemlist[index]
Return a list of indexes of the subsets
'''
subset_indexes = []
longestitem = itemlist[index]
for i in range(len(itemlist)):
if all(element in longestitem for element in itemlist[i]):
subset_indexes.append(i)
return subset_indexes
def extract_items(itemlist:list)->list:
'''
Find the largest item in itemlist and find all subsets of itemlist that are subsets of the largest item.
Remove the largest item and all its subsets from itemlist. Repeat until itemlist is empty.
Return a list of the largest items.
'''
items = []
processed_indices = set() # Track indices of processed items
while len(processed_indices) < len(itemlist):
longestitem_index = getlongestitem(itemlist)
while longestitem_index in processed_indices:
itemlist[longestitem_index] = []
longestitem_index = getlongestitem(itemlist)
subset_indexes = find_subsets(itemlist, longestitem_index)
items.append(itemlist[longestitem_index])
processed_indices.update(subset_indexes)
return items
extract_items_udf = udf(extract_items, ArrayType(ArrayType(StringType())))
df_with_items = df.withColumn("extracted_items", extract_items_udf(df["items_list"]))
result_df = df_with_items.select(df_with_items["freq"], explode(df_with_items["extracted_items"]).alias("items"))
result_df.show(truncate=False)
The results then looks like this:
+----+---------+
|freq|items |
+----+---------+
|3 |[c, b, a]|
|3 |[e, c] |
|6 |[b, a] |
|6 |[c] |
+----+---------+
Now that is all fine and runs - on a small dataset. But my issue is that the number of items I have is around 60 and thus the number of combinations is extremly large. fpgrowth delivered results but the next step, removing those subsets crashes because I run out of memory. I tried batching this by looking at one frequency at a time but even then I have the problem that there are so many itemsets with the same frequency that I can not process this well.
From the dataframe named frequentItemsets, you can probably try a self left_anti join. this way there is no need to move all items (using groupby + cllect_list) with the same value of freq to the same partition(potentially leading to the OOM issue).
# add a column to calculate the number of array elements for items
frequentItemsets = frequentItemsets.withColumn('sz', F.size('items'))
result_df = (frequentItemsets.alias('f1')
.join(
frequentItemsets.alias('f2'),
F.expr("""
f1.freq = f2.freq AND
f1.sz < f2.sz AND
size(array_intersect(f1.items,f2.items)) = f1.sz
"""),
"left_anti"
)
)
result_df.show()
+---------+----+---+
| items|freq| sz|
+---------+----+---+
| [b, a]| 6| 2|
| [c]| 6| 1|
|[c, b, a]| 3| 3|
| [e, c]| 3| 2|
+---------+----+---+
Where we use the following conditions to check if the array f1.items is a subset of f2.items and thus exclude it using left-anti join.
f1.sz < f2.sz AND size(array_intersect(f1.items,f2.items)) = f1.sz
Note: for array_intersect function to work here, no duplicate array elements should be allowed for column items which I believe applied to your task.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With