Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python equivalent of Golang's select on channels

Tags:

python

go

Go has a select statement that works on channels. From the documentation:

The select statement lets a goroutine wait on multiple communication operations.

A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.

Is there a Python equivalent of the following code:

package main

import "fmt"

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            c1 <- i
        }
        quit <- 0
    }()

    go func() {
        for i := 0; i < 2; i++ {
            c2 <- i
        }
    }()

    for {
        select {
        case <-c1:
            fmt.Println("Received value from c1")
        case <-c2:
            fmt.Println("Received value from c2")
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

Output of this program:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit
like image 655
Cenk Alti Avatar asked Oct 02 '13 06:10

Cenk Alti


4 Answers

Here's a pretty direct translation, but the "choosing which if multiple are ready" part works differently - it's just taking what came in first. Also this is like running your code with gomaxprocs(1).

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

The basic change is simulating the select with threads that combine messages. If you were going to use this pattern much, you might write some select code:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print 'Received value from c1'
        elif which is c2:
            print 'Received value from c2'
        elif which is quit:
            print 'Received value from quit'
            return
main()

But...

Note that this select isn't quite the go one, though it doesn't matter for your program - a goroutine could send a result on a channel that would be queued up in the select and lost if we didn't always iterate over the select to completion!

like image 103
Thomas Avatar answered Nov 05 '22 20:11

Thomas


Also consider the offset library by Benoit Chesneau. It is a port of the Go concurrency model to Python, using fibers under the covers.

He gave a presentation about this at PyCon APAC 2013:

  • Slides
  • Video
like image 23
Brian Dorsey Avatar answered Nov 05 '22 21:11

Brian Dorsey


You can use multiprocessing.Pipe instead of chan, threading.Thread instead of go and select.select instead of select.

Here's a reimplementation of your go example in Python using this approach:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread


def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print 'Received value from c1'
        elif which == c2_r:
            c2_r.recv()
            print 'Received value from c2'
        elif which == quit_r and len(ready) == 1:
            quit_r.recv()
            print 'Received value from quit'
            return

if __name__ == '__main__':
    main()

This implementation is based upon @Thomas's implementation, but unlike @Thomas's it doesn't spawn extra threads to perform the select.

Tested on Linux with Python 2.7.13. Windows may behave differently as select is a Unixy thing.

Edit: I added the len(ready) == 1 condition so quit is only handled after the other pipes are drained. This isn't required in Go as the channels are zero sized, so func1 can't send a message to quit_w until after the message sent to c1_w has been received. Thanks to the comment by @Sean Perry.

like image 9
Will Manley Avatar answered Nov 05 '22 20:11

Will Manley


With Python 3.5 there are the keywords async and await which make it possible to have functions which can be suspended in execution and thus are able to run on an evenloop instead of threads. The asyncio std lib is offering one.

To more directly map the behaviour of Go blocking channels and select you might make use of this small library and then your example code would look very similar in Python.

like image 4
pothos Avatar answered Nov 05 '22 21:11

pothos