Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Twisted: why is it that passing a deferred callback to a deferred thread makes the thread blocking all of a sudden?

I unsuccessfully tried using txredis (the non blocking twisted api for redis) for a persisting message queue I'm trying to set up with a scrapy project I am working on. I found that although the client was not blocking, it became much slower than it could have been because what should have been one event in the reactor loop was split up into thousands of steps.

So instead, I tried making use of redis-py (the regular blocking twisted api) and wrapping the call in a deferred thread. It works great, however I want to perform an inner deferred when I make a call to redis as I would like to set up connection pooling in attempts to speed things up further.

Below is my interpretation of some sample code taken from the twisted docs for a deferred thread to illustrate my use case:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

And here is my alteration for connection pooling that makes the code in the deferred thread blocking :

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

So my question is, does anyone know why my alteration causes the deferred thread to be blocking and/or can anyone suggest a better solution?

like image 732
surtyaar Avatar asked Mar 17 '10 21:03

surtyaar


1 Answers

Well, as the twisted docs say:

Deferreds do not make the code magically not block

Whenever you're using blocking code, such as sleep, you have to defer it to a new thread.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

In case the redis api is not very complex it might be more natural to rewrite it using twisted.web, instead of just calling the blocking api in a lot threads.

like image 68
Jochen Ritzel Avatar answered Nov 09 '22 16:11

Jochen Ritzel