I know there are a thousand questions out there related to how best to partition your DataFrames
or RDD
s by salting keys, etc., but I think this situation is different enough to warrant its own question.
I am building a collaborative filtering recommender engine in PySpark, which means each user (row)'s unique item ratings need to be compared. So, for a DataFrame
of dimensions M (rows) x N (columns)
, this means the dataset becomes M x (K choose 2)
where K << N
is the number of non-null (i.e., rated) elements for a user.
My algorithm works quite well and efficiently for datasets where users have rated approximately a uniform amount of items. However, for situations where a subset of users have rated a lot of items (orders of magnitude greater than other users in the same partition), my data becomes extremely skewed and the last few partitions begin to take large amounts of time. Consider, for a simple example, the following DataFrame
:
cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
(1, 4.5, 3.5, None, 1.0, None), # user 1
(2, 2.0, None, 5.0, 4.0, 3.0), # user 2
(3, 3.5, 5.0, 1.0, None, 1.0), # user 3
(4, None, None, 4.5, 3.5, 4.0), # user 4
(5, None, None, None, None, 4.5) # user 5
]
sc.parallelize(ratings, 2).toDF(cols)
My situation presents itself in a larger variation of this DataFrame
(~1,000,000 users and ~10k items) where some users have rated a much larger proportion of the movies than others. Initially, I sparsify my DataFrame
as follows:
def _make_ratings(row):
import numpy as np
non_null_mask = ~np.isnan(row)
idcs = np.where(non_null_mask)[0] # extract the non-null index mask
# zip the non-null idcs with the corresponding ratings
rtgs = row[non_null_mask]
return list(zip(idcs, rtgs))
def as_array(partition):
import numpy as np
for row in partition:
yield _make_ratings(np.asarray(row, dtype=np.float32))
# drop the id column, get the RDD, and make the copy of np.ndarrays
ratings = R.drop('id').rdd\
.mapPartitions(as_array)\
.cache()
Then, I can examine the number of mutual ratings pairs required for each partition in the following manner:
n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.)
sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)
Initially, this was the distribution of mutual ratings pairs per partition I got:
As you can see, that's just not scalable. So my first attempt to fix this was to more intelligently partition my data frame at the source. I came up with the following function, which will partition my data frame rows randomly:
def shuffle_partition(X, n_partitions, col_name='shuffle'):
from pyspark.sql.functions import rand
X2 = X.withColumn(col_name, rand())
return X2.repartition(n_partitions, col_name).drop(col_name)
This worked, kind of. And after applying it, here's the new distribution:
This definitely scales much better, but still not to my liking. There must be a way to distribute these "power raters" more uniformly across partitions, but I just can't figure it out. I've been thinking about partitioning by a column of "rating count per user," but that would ultimately lump all of the high-rating-count users together, as opposed to splitting them up.
Am I missing something obvious?
I implemented igrinis' solution in the following function (I'm sure there's a more elegant way to write this, but I'm not super familiar with the DataFrame
API, so I went back to RDD
for this—critiques welcome), but the distribution was roughly the same as the original, so not sure if I did something incorrectly or not...:
def partition_by_rating_density(X, id_col_name, n_partitions,
partition_col_name='partition'):
"""Segment partitions by rating density. Partitions will be more
evenly distributed based on the number of ratings for each user.
Parameters
----------
X : PySpark DataFrame
The ratings matrix
id_col_name : str
The ID column name
n_partitions : int
The number of partitions in the new DataFrame.
partition_col_name : str
The name of the partitioning column
Returns
-------
with_partition_key : PySpark DataFrame
The partitioned DataFrame
"""
ididx = X.columns.index(id_col_name)
def count_non_null(row):
sm = sum(1 if v is not None else 0
for i, v in enumerate(row) if i != ididx)
return row[ididx], sm
# add the count as the last element and id as the first
counted = X.rdd.map(count_non_null)\
.sortBy(lambda r: r[-1], ascending=False)
# get the count array out, zip it with the index, and then flatMap
# it out to get the sorted index
indexed = counted.zipWithIndex()\
.map(lambda ti: (ti[0][0], ti[1] % n_partitions))\
.toDF([id_col_name, partition_col_name])
# join back with indexed, which now has the partition column
counted_indexed = X.join(indexed, on=id_col_name, how='inner')
# the columns to drop
return counted_indexed.repartition(n_partitions, partition_col_name)\
.drop(partition_col_name)
Solution: In order to find non-null values of PySpark DataFrame columns, we need to use negate of isNotNull() function for example ~df. name. isNotNull() similarly for non-nan values ~isnan(df.name) .
In PySpark DataFrame you can calculate the count of Null, None, NaN or Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().
Does PySpark count include null? Count of null values of dataframe in pyspark is obtained using null() Function. Count of Missing values of dataframe in pyspark is obtained using isnan() Function.
PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.
What you can do is get a sorted list of users by their number of ratings and then have their index in column divided by the number of partitions. Get the remainder of the division as a column, and then repartition with partitionBy()
on that column. This way you partitions will have almost equal representations of all user ratings count.
For 3 partitions this will get you:
[1000, 800, 700, 600, 200, 30, 10, 5] - number of ratings
[ 0, 1, 2, 3, 4, 5, 6, 7] - position in sorted index
[ 0, 1, 2, 0, 1, 2, 0, 1] - group to partition by
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With