Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django, Celery with Recursion and Twitter API

I'm working with Django 1.4 and Celery 3.0 (rabbitmq) to build an assemblage of tasks for sourcing and caching queries to Twitter API 1.1. One thing I am trying to implement is chain of tasks, the last of which makes a recursive call to the task two nodes back, based on responses so far and response data in most recently retrieved response. Concretely, this allows the app to traverse a user timeline (up to 3200 tweets), taking into account that any given request can only yield at most 200 tweets (limitation on Twitter API).

Key components of my tasks.py can be seen here, but before pasting, I'll show the chain i'm calling from my Python shell (but that will ultimately be launched via user inputs in the final web app). Given:

>>request(twitter_user_id='#1010101010101#, 
  total_requested=1000, 
  max_id = random.getrandbits(128) #e.g. arbitrarily large number)

I call:

>> res = (twitter_getter.s(request) | 
        pre_get_tweets_for_user_id.s() | 
        get_tweets_for_user_id.s() | 
        timeline_recursor.s()).apply_async()

The critical thing is that timeline_recursor can initiate a variable number of get_tweets_for_user_id subtasks. When timeline_recursor is in its base case, it should return a response dict as defined here:

@task(rate_limit=None)
def timeline_recursor(request):
    previous_tweets=request.get('previous_tweets', None) #If it's the first time through, this will be None
    if not previous_tweets:
        previous_tweets = [] #so we initiate to empty array
    tweets = request.get('tweets', None) 

    twitter_user_id=request['twitter_user_id']
    previous_max_id=request['previous_max_id']
    total_requested=request['total_requested']
    pulled_in=request['pulled_in']

    remaining_requested = total_requested - pulled_in
    if previous_max_id:
        remaining_requested += 1 #this is because cursored results will always have one overlapping id

    else:
        previous_max_id = random.getrandbits(128) # for first time through loop

    new_max_id = min([tweet['id'] for tweet in tweets])
    test = lambda x, y: x<y

    if remaining_requested < 0:  #because we overshoot by requesting batches of 200
        remaining_requested = 0

    if tweets:
        previous_tweets.extend(tweets)

    if tweets and remaining_requested and (pulled_in > 1) and test(new_max_id, previous_max_id):

        request = dict(user_pk=user_pk,
                    twitter_user_id=twitter_user_id,
                    max_id = new_max_id,
                    total_requested = remaining_requested,
                    tweets=previous_tweets)

        #problem happens in this part of the logic???

        response = (twitter_getter_config.s(request) | get_tweets_for_user_id.s() | timeline_recursor.s()).apply_async()

    else: #if in base case, combine all tweets pulled in thus far and send back as "tweets" -- to be 
          #saved in db or otherwise consumed
        response = dict(
                    twitter_user_id=twitter_user_id,
                    total_requested = total_requested,
                    tweets=previous_tweets)
    return response

My expected response for res.result is therefore a dictionary comprised of a twitter user id, a requested number of tweets, and the set of tweets pulled in across successive calls. However, all is not well in recursive task land. When i run the chain identified above, if I enter res.status right after initiating chain, it indicates "SUCCESS", even though in the log view of my celery worker, I can see that chained recursive calls to the twitter api are being made as expected, with the correct parameters. I can also immediately run result.result even as chained tasks are being executed. res.result yields an AsyncResponse instance id. Even after recursively chained tasks have finished running, res.result remains an AsyncResult id.

On the other hand, I can access my set of full tweets by going to res.result.result.result.result['tweets']. I can deduce that each of the chained chained subtasks is indeed occuring, I just don't understand why res.result doesn't have the expected result. The recursive returns that should be happening when timeline_recursor gets its base case don't appear to be propagating as expected.

Any thoughts on what can be done? Recursion in Celery can get quite powerful, but to me at least, it's not totally apparent how we should be thinking of recursion and recursive functions that utilize Celery and how this affects the logic of return statements in chained tasks.

Happy to clarify as needed, and thanks in advance for any advice.

like image 807
Benjamin White Avatar asked Feb 22 '26 01:02

Benjamin White


1 Answers

what does apply_async return ( as in type of object )?

i don't know celery, but in Twisted and many other async frameworks... a call to something like that would immediately return ( usually True or perhaps an object that can track state ) as the tasks are deferred into the queue.

again, Not knowing celery , i would guess that this is happening:

you are: defining response immediately as the async deferred task, but then trying to act on it as if results have come in

you want to be: defining a callback routine to run on the results and return a value, once the task has been completed

looking at the celery docs, apply_async accepts callbacks via link - and i couldn't find any example of someone trying to capture a return value from it.

like image 147
Jonathan Vanasco Avatar answered Feb 24 '26 15:02

Jonathan Vanasco



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!