Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Uniformly partition PySpark Dataframe by count of non-null elements in row

I know there are a thousand questions out there related to how best to partition your DataFrames or RDDs by salting keys, etc., but I think this situation is different enough to warrant its own question.

I am building a collaborative filtering recommender engine in PySpark, which means each user (row)'s unique item ratings need to be compared. So, for a DataFrame of dimensions M (rows) x N (columns), this means the dataset becomes M x (K choose 2) where K << N is the number of non-null (i.e., rated) elements for a user.

My algorithm works quite well and efficiently for datasets where users have rated approximately a uniform amount of items. However, for situations where a subset of users have rated a lot of items (orders of magnitude greater than other users in the same partition), my data becomes extremely skewed and the last few partitions begin to take large amounts of time. Consider, for a simple example, the following DataFrame:

cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
    (1, 4.5,  3.5,  None, 1.0,  None),  # user 1
    (2, 2.0,  None, 5.0,  4.0,  3.0),   # user 2
    (3, 3.5,  5.0,  1.0,  None, 1.0),   # user 3
    (4, None, None, 4.5,  3.5,  4.0),   # user 4
    (5, None, None, None, None, 4.5)    # user 5
]

sc.parallelize(ratings, 2).toDF(cols)

My situation presents itself in a larger variation of this DataFrame (~1,000,000 users and ~10k items) where some users have rated a much larger proportion of the movies than others. Initially, I sparsify my DataFrame as follows:

def _make_ratings(row):
    import numpy as np
    non_null_mask = ~np.isnan(row)
    idcs = np.where(non_null_mask)[0]  # extract the non-null index mask

    # zip the non-null idcs with the corresponding ratings
    rtgs = row[non_null_mask]
    return list(zip(idcs, rtgs))


def as_array(partition):
    import numpy as np
    for row in partition:
        yield _make_ratings(np.asarray(row, dtype=np.float32))


# drop the id column, get the RDD, and make the copy of np.ndarrays
ratings = R.drop('id').rdd\
           .mapPartitions(as_array)\
           .cache()

Then, I can examine the number of mutual ratings pairs required for each partition in the following manner:

n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.)
sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)

Initially, this was the distribution of mutual ratings pairs per partition I got:

First distribution

As you can see, that's just not scalable. So my first attempt to fix this was to more intelligently partition my data frame at the source. I came up with the following function, which will partition my data frame rows randomly:

def shuffle_partition(X, n_partitions, col_name='shuffle'):
    from pyspark.sql.functions import rand
    X2 = X.withColumn(col_name, rand())
    return X2.repartition(n_partitions, col_name).drop(col_name)

This worked, kind of. And after applying it, here's the new distribution:

Second dist

This definitely scales much better, but still not to my liking. There must be a way to distribute these "power raters" more uniformly across partitions, but I just can't figure it out. I've been thinking about partitioning by a column of "rating count per user," but that would ultimately lump all of the high-rating-count users together, as opposed to splitting them up.

Am I missing something obvious?

Update

I implemented igrinis' solution in the following function (I'm sure there's a more elegant way to write this, but I'm not super familiar with the DataFrame API, so I went back to RDD for this—critiques welcome), but the distribution was roughly the same as the original, so not sure if I did something incorrectly or not...:

def partition_by_rating_density(X, id_col_name, n_partitions,
                                partition_col_name='partition'):
    """Segment partitions by rating density. Partitions will be more
    evenly distributed based on the number of ratings for each user.

    Parameters
    ----------
    X : PySpark DataFrame
        The ratings matrix

    id_col_name : str
        The ID column name

    n_partitions : int
        The number of partitions in the new DataFrame.

    partition_col_name : str
        The name of the partitioning column

    Returns
    -------
    with_partition_key : PySpark DataFrame
        The partitioned DataFrame
    """
    ididx = X.columns.index(id_col_name)

    def count_non_null(row):
        sm = sum(1 if v is not None else 0
                 for i, v in enumerate(row) if i != ididx)
        return row[ididx], sm

    # add the count as the last element and id as the first
    counted = X.rdd.map(count_non_null)\
               .sortBy(lambda r: r[-1], ascending=False)

    # get the count array out, zip it with the index, and then flatMap
    # it out to get the sorted index
    indexed = counted.zipWithIndex()\
                     .map(lambda ti: (ti[0][0], ti[1] % n_partitions))\
                     .toDF([id_col_name, partition_col_name])

    # join back with indexed, which now has the partition column
    counted_indexed = X.join(indexed, on=id_col_name, how='inner')

    # the columns to drop
    return counted_indexed.repartition(n_partitions, partition_col_name)\
        .drop(partition_col_name)
like image 553
TayTay Avatar asked Sep 20 '17 19:09

TayTay


People also ask

How do you count non null values in PySpark?

Solution: In order to find non-null values of PySpark DataFrame columns, we need to use negate of isNotNull() function for example ~df. name. isNotNull() similarly for non-nan values ~isnan(df.name) .

Where is the count of null values in PySpark DataFrame?

In PySpark DataFrame you can calculate the count of Null, None, NaN or Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().

Does PySpark count include Null?

Does PySpark count include null? Count of null values of dataframe in pyspark is obtained using null() Function. Count of Missing values of dataframe in pyspark is obtained using isnan() Function.

How do I count the number of partitions in PySpark?

PySpark (Spark with Python) Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.


1 Answers

What you can do is get a sorted list of users by their number of ratings and then have their index in column divided by the number of partitions. Get the remainder of the division as a column, and then repartition with partitionBy() on that column. This way you partitions will have almost equal representations of all user ratings count.

For 3 partitions this will get you:

[1000, 800, 700, 600, 200, 30, 10, 5] - number of ratings
[   0,   1,   2,   3,   4,  5,  6, 7] - position in sorted index
[   0,   1,   2,   0,   1,  2,  0, 1] - group to partition by
like image 85
igrinis Avatar answered Sep 19 '22 00:09

igrinis