Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Continuous aggregates over large datasets

I'm trying to think of an algorithm to solve this problem I have. It's not a HW problem, but for a side project I'm working on.

There's a table A that has about (order of) 10^5 rows and adds new in the order of 10^2 every day.

Table B has on the order of 10^6 rows and adds new at 10^3 every day. There's a one to many relation from A to B (many B rows for some row in A).

I was wondering how I could do continuous aggregates for this kind of data. I would like to have a job that runs every ~10mins and does this: For every row in A, find every row in B related to it that were created in the last day, week and month (and then sort by count) and save them in a different DB or cache them.

If this is confusing, here's a practical example: Say table A has Amazon products and table B has product reviews. We would like to show a sorted list of products with highest reviews in the last 4hrs, day, week etc. New products and reviews are added at a fast pace, and we'd like the said list to be as up-to-date as possible.

Current implementation I have is just a for loop (pseudo-code):

result = []

for product in db_products:
    reviews = db_reviews(product_id=product.id, create>=some_time)
    reviews_count = len(reviews)
    result[product]['reviews'] = reviews
    result[product]['reviews_count'] = reviews_count

sort(result, by=reviews_count)

return result

I do this every hour, and save the result in a json file to serve. The problem is that this doesn't really scale well, and takes a long time to compute.

So, where could I look to solve this problem?

UPDATE:

Thank you for your answers. But I ended up learning and using Apache Storm.

like image 521
KGo Avatar asked Jan 23 '16 01:01

KGo


1 Answers

Summary of requirements

Having two bigger tables in a database, you need regularly creating some aggregates for past time periods (hour, day, week etc.) and store the results in another database.

I will assume, that once a time period is past, there are no changes to related records, in other words, the aggregate for past period has always the same result.

Proposed solution: Luigi

Luigi is framework for plumbing dependent tasks and one of typical uses is calculating aggregates for past periods.

The concept is as follows:

  • write simple Task instance, which defines required input data, output data (called Target) and process to create the target output.
  • Tasks can be parametrized, typical parameter is time period (specific day, hour, week etc.)
  • Luigi can stop tasks in the middle and start later. It will consider any task, for which is target already existing to be completed and will not rerun it (you would have to delete the target content to let it rerun).

In short: if the target exists, the task is done.

This works for multiple types of targets like files in local file system, on hadoop, at AWS S3, and also in database.

To prevent half done results, target implementations take care of atomicity, so e.g. files are first created in temporary location and are moved to final destination just after they are completed.

In databases there are structures to denote, that some database import is completed.

You are free to create your own target implementations (it has to create something and provide method exists to check, the result exists.

Using Luigi for your task

For the task you describe you will probably find everything you need already present. Just few tips:

class luigi.postgres.CopyToTable allowing to store records into Postgres database. The target will automatically create so called "marker table" where it will mark all completed tasks.

There are similar classes for other types of databases, one of them using SqlAlchemy which shall probably cover the database you use, see class luigi.contrib.sqla.CopyToTable

At Luigi doc is working example of importing data into sqlite database

Complete implementation is beyond extend feasible in StackOverflow answer, but I am sure, you will experience following:

  • The code to do the task is really clear - no boilerplate coding, just write only what has to be done.
  • nice support for working with time periods - even from command line, see e.g. Efficiently triggering recurring tasks. It even takes care of not going too far in past, to prevent generating too many tasks possibly overloading your servers (default values are very reasonably set and can be changed).
  • Option to run the task on multiple servers (using central scheduler, which is provided with Luigi implementation).

I have processed huge amounts of XML files with Luigi and also made some tasks, importing aggregated data into database and can recommend it (I am not author of Luigi, I am just happy user).

Speeding up database operations (queries)

If your task suffers from too long execution time to perform the database query, you have few options:

  • if you are counting reviews per product by Python, consider trying SQL query - it is often much faster. It shall be possible to create SQL query which uses count on proper records and returns directly the number you need. With group by you shall even get summary information for all products in one run.
  • set up proper index, probably on "reviews" table on "product" and "time period" column. This shall speed up the query, but make sure, it does not slow down inserting new records too much (too many indexes can cause that).

It might happen, that with optimized SQL query you will get working solution even without using Luigi.

like image 107
Jan Vlcinsky Avatar answered Sep 30 '22 09:09

Jan Vlcinsky