I'm working with Django 1.4 and Celery 3.0 (rabbitmq) to build an assemblage of tasks for sourcing and caching queries to Twitter API 1.1. One thing I am trying to implement is chain of tasks, the last of which makes a recursive call to the task two nodes back, based on responses so far and response data in most recently retrieved response. Concretely, this allows the app to traverse a user timeline (up to 3200 tweets), taking into account that any given request can only yield at most 200 tweets (limitation on Twitter API).
Key components of my tasks.py can be seen here, but before pasting, I'll show the chain i'm calling from my Python shell (but that will ultimately be launched via user inputs in the final web app). Given:
>>request(twitter_user_id='#1010101010101#,
total_requested=1000,
max_id = random.getrandbits(128) #e.g. arbitrarily large number)
I call:
>> res = (twitter_getter.s(request) |
pre_get_tweets_for_user_id.s() |
get_tweets_for_user_id.s() |
timeline_recursor.s()).apply_async()
The critical thing is that timeline_recursor can initiate a variable number of get_tweets_for_user_id subtasks. When timeline_recursor is in its base case, it should return a response dict as defined here:
@task(rate_limit=None)
def timeline_recursor(request):
previous_tweets=request.get('previous_tweets', None) #If it's the first time through, this will be None
if not previous_tweets:
previous_tweets = [] #so we initiate to empty array
tweets = request.get('tweets', None)
twitter_user_id=request['twitter_user_id']
previous_max_id=request['previous_max_id']
total_requested=request['total_requested']
pulled_in=request['pulled_in']
remaining_requested = total_requested - pulled_in
if previous_max_id:
remaining_requested += 1 #this is because cursored results will always have one overlapping id
else:
previous_max_id = random.getrandbits(128) # for first time through loop
new_max_id = min([tweet['id'] for tweet in tweets])
test = lambda x, y: x<y
if remaining_requested < 0: #because we overshoot by requesting batches of 200
remaining_requested = 0
if tweets:
previous_tweets.extend(tweets)
if tweets and remaining_requested and (pulled_in > 1) and test(new_max_id, previous_max_id):
request = dict(user_pk=user_pk,
twitter_user_id=twitter_user_id,
max_id = new_max_id,
total_requested = remaining_requested,
tweets=previous_tweets)
#problem happens in this part of the logic???
response = (twitter_getter_config.s(request) | get_tweets_for_user_id.s() | timeline_recursor.s()).apply_async()
else: #if in base case, combine all tweets pulled in thus far and send back as "tweets" -- to be
#saved in db or otherwise consumed
response = dict(
twitter_user_id=twitter_user_id,
total_requested = total_requested,
tweets=previous_tweets)
return response
My expected response for res.result is therefore a dictionary comprised of a twitter user id, a requested number of tweets, and the set of tweets pulled in across successive calls. However, all is not well in recursive task land. When i run the chain identified above, if I enter res.status right after initiating chain, it indicates "SUCCESS", even though in the log view of my celery worker, I can see that chained recursive calls to the twitter api are being made as expected, with the correct parameters. I can also immediately run result.result even as chained tasks are being executed. res.result yields an AsyncResponse instance id. Even after recursively chained tasks have finished running, res.result remains an AsyncResult id.
On the other hand, I can access my set of full tweets by going to res.result.result.result.result['tweets']. I can deduce that each of the chained chained subtasks is indeed occuring, I just don't understand why res.result doesn't have the expected result. The recursive returns that should be happening when timeline_recursor gets its base case don't appear to be propagating as expected.
Any thoughts on what can be done? Recursion in Celery can get quite powerful, but to me at least, it's not totally apparent how we should be thinking of recursion and recursive functions that utilize Celery and how this affects the logic of return statements in chained tasks.
Happy to clarify as needed, and thanks in advance for any advice.
what does apply_async return ( as in type of object )?
i don't know celery, but in Twisted and many other async frameworks... a call to something like that would immediately return ( usually True or perhaps an object that can track state ) as the tasks are deferred into the queue.
again, Not knowing celery , i would guess that this is happening:
you are: defining response immediately as the async deferred task, but then trying to act on it as if results have come in
you want to be: defining a callback routine to run on the results and return a value, once the task has been completed
looking at the celery docs, apply_async accepts callbacks via link - and i couldn't find any example of someone trying to capture a return value from it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With