Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throughput differences when using coroutines vs threading

A few days ago I has asked a question on SO about helping me design a paradigm for structuring multiple HTTP requests

Here's the scenario. I would like a have a multi-producer, multi-consumer system. My producers crawl and scrape a few sites and add the links that it finds into a queue. Since I'll be crawling multiple sites, I would like to have multiple producers/crawlers.

The consumers/workers feed off this queue, make TCP/UDP requests to these links and saves the results to my Django DB. I would also like to have multiple-workers as each queue item is totally independent of each other.

People suggested that use a coroutine library for this i.e. Gevent or Eventlet. Having never worked with coroutines, I read that even though the programming paradigm is similar to threaded paradigms, only one thread is actively executing but when blocking calls occur - such as I/O calls - the stacks are switched in-memory and the other green thread takes over until it encounters some sort of a blocking I/O call. Hopefully I got this right? Here's the code from one of my SO posts:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

This works well because the sleep calls are blocking calls and when a sleep event occurs, another green thread takes over. This is a lot faster than sequential execution. As you can see, I don't have any code in my program that purposely yields the execution of one thread to another thread. I fail to see how this fits into scenario above as I would like to have all the threads executing simultaneously.

All works fine, but I feel the throughput that I've achieved using Gevent/Eventlets is higher than the original sequentially running program but drastically lower than what could be achieved using real-threading.

If I were to re-implement my program using threading mechanisms, each of my producers and consumers could simultaneously be working without the need to swap stacks in and out like coroutines.

Should this be re-implemented using threading? Is my design wrong? I've failed to see the real benefits of using coroutines.

Maybe my concepts are little muddy but this is what I've assimilated. Any help or clarification of my paradigm and concepts would be great.

Thanks

like image 952
Mridang Agarwalla Avatar asked Feb 12 '12 09:02

Mridang Agarwalla


People also ask

Are coroutines faster than threads?

When the number of Thread and Coroutine is very small, Thread is faster. However, when the number becomes bigger, Coroutine is much faster.

Are coroutines better than threads in all situations?

While doing things on your own managing thread pools manually is possible, coroutines is the recommended solution for asynchronous programming on Android due to the built-in cancellation support, easier error handling, structured concurrency which reduces the likelihood of memory leaks, and its integration with Jetpack ...

Why coroutines are fast?

It means, that if you await for something in loop (for example, await new portion of data), coroutines work faster in scenario with a lot of same parallel operations, because instead of context switch (which requires expensive operations, such as sys calls, etc.) JVM just reuses the current thread.

Why are coroutines lightweight?

Lightweight: You can run many coroutines on a single thread due to support for suspension, which doesn't block the thread where the coroutine is running. Suspending saves memory over blocking while supporting many concurrent operations. Fewer memory leaks: Use structured concurrency to run operations within a scope.


1 Answers

As you can see, I don't have any code in my program that purposely yields the execution of one thread to another thread. I fail to see how this fits into scenario above as I would like to have all the threads executing simultaneously.

There is a single OS thread but several greenlets. In your case gevent.sleep() allows workers to execute concurrently. Blocking IO calls such as urllib2.urlopen(url).read() do the same if you use urllib2 patched to work with gevent (by calling gevent.monkey.patch_*()).

See also A Curious Course on Coroutines and Concurrency to understand how a code can work concurrently in a single threaded environment.

To compare throughput differences between gevent, threading, multiprocessing you could write the code that compatible with all aproaches:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

The rest of the script is the same for all concurrency implementations:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

Don't execute code at a module level put it in main():

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()
like image 89
jfs Avatar answered Oct 12 '22 22:10

jfs