Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficiently compute item colaborating filtering similarity using numba, polars and numpy

Disclaimer The question is part of a thread including those two SO questions (q1, q2)

The data resemble movie ratings from the ratings.csv file (~891mb) of ml-latest dataset.

Once I read the csv file with polars library like:

movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))

Let's assume we want to compute the similarity between movies seen by user=1 (so for example 62 movies) with the rest of the movies in the dataset. FYI, the dataset has ~83,000 movies so for each other_movie (82,938) compute a similarity with each movie seen by user 1 (62 movies). The complexity is 62x82938 (iterations).

For this example the benchmarks reported are only for 400/82,938 other_movies

To do so, I create two polars dataframes. One dataframe with the other_movies (~82,938 row) and a second dataframe with only the movies seen by the user (62 rows).

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

The result are two polars dataframes with rows(movies) and columns(users seen the movies & the ratings from each user).

enter image description here

The first dataframe contains only other_movies that we can potentially recommend to user1 seen he/she has not seen them.

The second dataframe contains only the movies seen by the user.

Next my approach is to iterate over each row of the first dataframe by applying a UDF function.

item_metadata_similarity = (
    items_metadata.with_columns(
        similarity_score=pl.struct(pl.all()).map_elements(
            lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
            return_dtype=pl.List(pl.List(pl.Float64)),
            strategy="threading"
        )
    )
)

, where item_compute_similarity_scoring_V2 is defined as:

def item_compute_similarity_scoring_V2(
    row,
    target_movies_metadata:pl.DataFrame
):
    users_item1 = np.asarray(row["users_seen_movie"])
    ratings_item1 = np.asarray(row["user_ratings"])
    computed_similarity: list=[]
    for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
        users_item2=np.asarray(row2["users_seen_movie"])
        ratings_item2=np.asarray(row2["user_ratings"])
        r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
        if r1.shape[0] != 0 and r2.shape[0] != 0:
            similarity_score = compute_similarity_score(r1, r2)
            if similarity_score > 0.0: #filter out negative or zero similarity scores
                computed_similarity.append((row2["movieId"], similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
    return most_similar_pairs

, item_ratings & compute_similarity_score defined as

def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
    common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
    sr1 = r1[indices1]
    sr2 = r2[indices2]
    assert len(sr1)==len(sr2), "ratings don't have same lengths"
    return sr1, sr2

@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    cos_theta = 1.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return cos_theta

The function basically, iterates over each row of the second dataframe and for each row computes the similarity between other_movie and the movie seen by the user. Thus, for 400 movies we do 400*62 iterations, generating 62 similarity scores per other_movie.

The result from each computation is an array with schema [[1, 0.20], [110, 0.34]]... (length 62 pairs per other_movie)

enter image description here

Benchmarks for 400 movies

  1. INFO - Item-Item: Computed similarity scores for 400 movies in: 0:05:49.887032
  2. ~2 minutes.
  3. ~5gb of RAM used.

I would to identify how can I improve the computations by using native polars commands or exploiting the numba framework for parallelism.

Update - 2nd approach using to_numpy() operations without iter_rows() and map_elements()

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(items_metadata.head(5))
target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(target_items_metadata.head(5))

With this second approach items_metadata and target_items_metadata are two large polars tables.

Then my next step is to save both tables into numpy.ndarrays with the to_numpy() command.

items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
    mask = items_metadata_array[:, 1] == other_movie
    other_movies_chunk = items_metadata_array[mask]
    u1 = other_movies_chunk[:,0].astype(np.int32)
    r1 = other_movies_chunk[:,2].astype(np.float32)
    computed_similarity: list=[]
    for i, user_movie in enumerate(user_rated_movies):
        print(user_movie)
        mask = target_items_metadata_array[:, 1] == user_movie
        target_movie_chunk = target_items_metadata_array[mask]
        u2 = target_movie_chunk[:,0].astype(np.int32)
        r2 = target_movie_chunk[:,2].astype(np.float32)
        common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
        if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
            similarity_score = compute_similarity_score(common_r1, common_r2)
            if similarity_score > 0.0:
                computed_similarity.append((user_movie, similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
    computed_similarity_scores[str(other_movie)] = most_similar_pairs

Benchmarks of the second approach (8.50 minutes > 6 minutes of the first approach)

  • Item-Item: Computed similarity scores for 400 movies in: 0:08:50.537102

Update - 3rd approach using iter_rows() operations

In my third approach, I have better results from the previous two methods, getting results in approximately 2 minutes for user 1 and 400 movies.

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

items_metadata is the metadata of other_movies not seen by the user 1.

target_items_metadata the metadata of the movies rated by user 1. By the term metadata I refer to the two aggregated .agg() columns, users_seen_movie and user_ratings

Finally, I create two for loops using iter_rows() method from polars

def cosine_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    # cos_theta = 1.0
    cos_theta = 0.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return max(0.0, cos_theta)

for row1 in item_metadata.iter_rows():
    computed_similarity: list= []
    for row2 in target_items_metadata.iter_rows():
        r1, r2 = item_ratings(np.asarray(row1[1]), np.asarray(row1[2]), np.asarray(row2[1]), np.asarray(row2[2]))
        if r1.shape[0]!=0 and r2.shape[0]!=0:
            similarity_score = cosine_similarity_score(r1, r2)
        computed_similarity.append((row2[0], similarity_score if similarity_score > 0 else 0))
    computed_similarity_scores[str(row1[0])] = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]

Benchmarks for 400 movies

  1. INFO - Item-Item: Computed similarity scores for 400 movies in: 0:01:50
  2. ~2 minutes.
  3. ~4.5gb of RAM used.
like image 575
NikSp Avatar asked Dec 29 '25 22:12

NikSp


1 Answers

I'm not too familiar with numba, so before trying to compare timings, the first thing I would try to do is create a "fully native" Polars approach:

This is a direct translation of the current approach (i.e. it still contains the "double for loop") so it just serves as a baseline attempt.

Because it uses the Lazy API, nothing in the loops is computed.

That is all done when .collect() is called (which allows Polars to parallelize the work).

The > 0.0 filtering for the similarity_score would be done after the results are collected.

input_id = 1

is_user_rating = pl.col("userId") == input_id

can_recommend = (
    pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
)

cosine_similarity = (
    pl.col('rating').dot('rating_right') /  
    ( pl.col('rating').pow(2).sum().sqrt() * 
      pl.col('rating_right').pow(2).sum().sqrt() ) 
)

user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()

potential_movies_to_recommend = (
    movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
)

# use the Lazy API so we can compute in parallel
df = movie_ratings.lazy()

computed_similarity_scores = []
for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
    for user_movie in user_rated_movies:
        score = (
            df.filter(pl.col("movieId") == user_movie)
              .join(
                 df.filter(pl.col("movieId") == other_movie),
                 on = "userId"
              )
              .select(cosine = cosine_similarity)
              .select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
        )
        computed_similarity_scores.append(score)
        
# All scores are computed in parallel
computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()
shape: (62, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ ---        ┆ ---         ┆ ---              │
│ i32        ┆ i32         ┆ f64              │
╞════════════╪═════════════╪══════════════════╡
│ 1          ┆ 2           ┆ 0.95669          │
│ 110        ┆ 2           ┆ 0.950086         │
│ 158        ┆ 2           ┆ 0.957631         │
│ 260        ┆ 2           ┆ 0.945542         │
│ …          ┆ …           ┆ …                │
│ 49647      ┆ 2           ┆ 0.9411           │
│ 52458      ┆ 2           ┆ 0.955353         │
│ 53996      ┆ 2           ┆ 0.930388         │
│ 54259      ┆ 2           ┆ 0.95469          │
└────────────┴─────────────┴──────────────────┘

Testing .head(100) I get 58s runtime compared to 111s runtime for your example, memory consumption is the same.

duckdb

As a comparison, duckdb with .head(400) runs in 5s

import duckdb

df = duckdb.sql("""
with 
   df     as (from 'imdb.parquet'),
   user   as (from df where movieId in (from df select movieId where userId = 1)),
   movies as (from df where movieId not in (from df select movieId where userId = 1)),
   other  as (from df where movieId in (from movies select distinct movieId order by movieId limit 400))
   
from
   user join other using (userId)
   
select   
   user.movieId user_movie,
   other.movieId other_movie,
   list_cosine_similarity(
      list(user.rating), list(other.rating)
   ) similarity_score
   
group by 
   user_movie, other_movie   
order by 
   user_movie, other_movie
""").pl()
shape: (24_764, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ ---        ┆ ---         ┆ ---              │
│ i64        ┆ i64         ┆ f64              │
╞════════════╪═════════════╪══════════════════╡
│ 1          ┆ 2           ┆ 0.95669          │
│ 1          ┆ 3           ┆ 0.941348         │
│ 1          ┆ 4           ┆ 0.92169          │
│ 1          ┆ 5           ┆ 0.943999         │
│ …          ┆ …           ┆ …                │
│ 54259      ┆ 407         ┆ 0.941241         │
│ 54259      ┆ 408         ┆ 0.934745         │
│ 54259      ┆ 409         ┆ 0.937361         │
│ 54259      ┆ 410         ┆ 0.94937          │
└────────────┴─────────────┴──────────────────┘
Elapsed time: 5.02638 seconds
like image 74
jqurious Avatar answered Dec 31 '25 11:12

jqurious



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!