Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django, Celery, Redis, RabbitMQ: Chained Tasks for Fanout-On-Writes

I've been watching Rick Branson's PyCon video: Messaging at Scale at Instagram. You might want to watch the video in order to answer this question. Rick Branson uses Celery, Redis and RabbitMQ. To get you up to speed, each user has a redis list for their homefeed. Each list contains media ID's of photos posted by the people they follow.

Justin Bieber for example has 1.5 million followers. When he posts a photo, the ID of that photo needs to be inserted into each individual redis list for each of his followers. This is called the Fanout-On-Write approach. However, there are a few reliability problems with this approach. It can work, but for someone like Justin Bieber or Lady Gaga who have millions of followers, doing this in the web request (where you have 0-500ms to complete the request) can be problem. By then, the request will timeout.

So Rick Branson decided to use Celery, an asynchronous task queue/job queue based on distributed message passing. Any heavy lifting such as inserting media IDs into follower's lists can be done asynchronously, outside of the web request. The request will complete and celery will continue to insert the IDs into all of the lists.

This approach works wonders. But again, you don't want to deliver all of Justin's followers to Celery in one huge chunk because it would tie up a celery worker. Why not have multiple workers work on it at the same time so it finishes faster? Brilliant idea! you'd want to break up this chunk into smaller chunks and have different workers working on each batch. Rick Branson does a batch of 10,000 followers, and he uses something called a cursor to keep inserting media IDs for all of Justin Bieber's followers until it is completed. In the video, he talks about this in 3:56

I was wondering if anyone could explain this more and show examples of how it can be done. I'm currently trying to attempt the same setup. I use Andy McCurdy's redis-py python client library to communicate with my redis server. For every user on my service, I create a redis followers list.

So a user with an ID of 343 would have a list at the following key:

followers:343

I also create a homefeed list for each user. Every user has their own list. So a user with an ID of 1990 would have a list at the following key:

homefeed:1990

In the "followers:343" redis list, it contains all the IDs of the people who follow user 343. user 343 has 20,007 followers. Below, I am retrieving all the IDs in the list starting from index 0 all the way to the end -1 just to show you what it looks like.

>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.

What you see is a list of all the ID's of users who follow user 343.

Here is my proj/mydjangoapp/tasks.py which contains my insert_into_homefeed function:

from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')

@shared_task
def insert_into_homefeed(photo_id, user_id):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)

    # Now for each follower_id in followers_list, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.

    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
    return "Fan Out Completed for %s" % (user_id)

In this task, when called from the Django view, it will grab all the IDs of the people who follow user 343 and then insert the photo ID into all of their homefeed lists.

Here is my upload view in my proj/mydjangoapp/views.py. I basically call celery's delay method and pass on the neccessary variables so that the request ends quickly:

# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed


@csrf_exempt
def Upload(request):
    if request.method == 'POST':
        data  = json.loads(request.body)
        newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
        newPhoto_ID = newPhoto.pk
        insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
        return HttpResponse("Request Completed")

How can I do this in such a way that it will be batched by 10,000?

like image 899
deadlock Avatar asked Jan 08 '14 21:01

deadlock


People also ask

What is the difference between Redis and celery in Django?

Celery is a task queue with focus on real-time processing, while also supporting task scheduling. Redis is a message broker. This means it handles the queue of "messages" between Django and Celery. Django is a web framework made for perfectionists with deadlines.

What is task queue in Django?

Task queues are used as a strategy to distribute the workload between threads/machines. In this tutorial I will explain how to install and setup Celery + RabbitMQ to execute asynchronous in a Django application. To work with Celery, we also need to install RabbitMQ because Celery requires an external solution to send and receive messages.

Why should I use celery in Django?

Why Should I Use Celery? Web applications works with request and response cycles. When the user access a certain URL of your application the Web browser send a request to your server. Django receive this request and do something with it. Usually it involves executing queries in the database, processing data.

How to handle thousands of requests simultaneously in RabbitMQ?

Popular platforms also use tools like Celery and RabbitMQ to handle thousands of requests simultaneously. To fix this problem in a simple and easy way, Celery comes into play. Celery being a distributed task queue allows handling vast amounts of requests in an asynchronous way. It helps schedule the tasks and runs them in a separate micro-thread.


1 Answers

The approach described in the video is task "chaining".

To get your task method up and running as a chain, you want to add an extra parameter that represents the index into the list of followers. Instead of working on the full list of followers, the task only works on a fixed batch size, starting from the index argument it was handed. At completion, the task should create a new task and pass the new index.

INSERT_INTO_HOMEFEED_BATCH = 10000

@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)

    if not followers_list_batch:
        return # zero followers or no more batches

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.
    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)

This works well because Redis lists are ordered and the lrange command doesn't return an error on out-of-range inputs.

like image 164
Dwight Gunning Avatar answered Oct 19 '22 05:10

Dwight Gunning