Go has a select statement that works on channels. From the documentation:
The select statement lets a goroutine wait on multiple communication operations.
A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.
Is there a Python equivalent of the following code:
package main
import "fmt"
func main() {
c1 := make(chan int)
c2 := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
c1 <- i
}
quit <- 0
}()
go func() {
for i := 0; i < 2; i++ {
c2 <- i
}
}()
for {
select {
case <-c1:
fmt.Println("Received value from c1")
case <-c2:
fmt.Println("Received value from c2")
case <-quit:
fmt.Println("quit")
return
}
}
}
Output of this program:
Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit
Here's a pretty direct translation, but the "choosing which if multiple are ready" part works differently - it's just taking what came in first. Also this is like running your code with gomaxprocs(1)
.
import threading
import Queue
def main():
c1 = Queue.Queue(maxsize=0)
c2 = Queue.Queue(maxsize=0)
quit = Queue.Queue(maxsize=0)
def func1():
for i in range(10):
c1.put(i)
quit.put(0)
threading.Thread(target=func1).start()
def func2():
for i in range(2):
c2.put(i)
threading.Thread(target=func2).start()
combined = Queue.Queue(maxsize=0)
def listen_and_forward(queue):
while True:
combined.put((queue, queue.get()))
t = threading.Thread(target=listen_and_forward, args=(c1,))
t.daemon = True
t.start()
t = threading.Thread(target=listen_and_forward, args=(c2,))
t.daemon = True
t.start()
t = threading.Thread(target=listen_and_forward, args=(quit,))
t.daemon = True
t.start()
while True:
which, message = combined.get()
if which is c1:
print 'Received value from c1'
elif which is c2:
print 'Received value from c2'
elif which is quit:
print 'Received value from quit'
return
main()
The basic change is simulating the select with threads that combine messages. If you were going to use this pattern much, you might write some select code:
import threading
import Queue
def select(*queues):
combined = Queue.Queue(maxsize=0)
def listen_and_forward(queue):
while True:
combined.put((queue, queue.get()))
for queue in queues:
t = threading.Thread(target=listen_and_forward, args=(queue,))
t.daemon = True
t.start()
while True:
yield combined.get()
def main():
c1 = Queue.Queue(maxsize=0)
c2 = Queue.Queue(maxsize=0)
quit = Queue.Queue(maxsize=0)
def func1():
for i in range(10):
c1.put(i)
quit.put(0)
threading.Thread(target=func1).start()
def func2():
for i in range(2):
c2.put(i)
threading.Thread(target=func2).start()
for which, msg in select(c1, c2, quit):
if which is c1:
print 'Received value from c1'
elif which is c2:
print 'Received value from c2'
elif which is quit:
print 'Received value from quit'
return
main()
Note that this select isn't quite the go one, though it doesn't matter for your program - a goroutine could send a result on a channel that would be queued up in the select and lost if we didn't always iterate over the select to completion!
Also consider the offset library by Benoit Chesneau. It is a port of the Go concurrency model to Python, using fibers under the covers.
He gave a presentation about this at PyCon APAC 2013:
You can use multiprocessing.Pipe
instead of chan
, threading.Thread
instead of go
and select.select
instead of select
.
Here's a reimplementation of your go example in Python using this approach:
import random
from multiprocessing import Pipe
from select import select
from threading import Thread
def main():
c1_r, c1_w = Pipe(duplex=False)
c2_r, c2_w = Pipe(duplex=False)
quit_r, quit_w = Pipe(duplex=False)
def func1():
for i in range(10):
c1_w.send(i)
quit_w.send(0)
Thread(target=func1).start()
def func2():
for i in range(2):
c2_w.send(i)
Thread(target=func2).start()
while True:
ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
which = random.choice(ready)
if which == c1_r:
c1_r.recv()
print 'Received value from c1'
elif which == c2_r:
c2_r.recv()
print 'Received value from c2'
elif which == quit_r and len(ready) == 1:
quit_r.recv()
print 'Received value from quit'
return
if __name__ == '__main__':
main()
This implementation is based upon @Thomas's implementation, but unlike @Thomas's it doesn't spawn extra threads to perform the select.
Tested on Linux with Python 2.7.13. Windows may behave differently as select is a Unixy thing.
Edit: I added the len(ready) == 1
condition so quit is only handled after the other pipes are drained. This isn't required in Go as the channels are zero sized, so func1
can't send a message to quit_w
until after the message sent to c1_w
has been received. Thanks to the comment by @Sean Perry.
With Python 3.5 there are the keywords async
and await
which make it possible to have functions which can be suspended in execution and thus are able to run on an evenloop instead of threads. The asyncio
std lib is offering one.
To more directly map the behaviour of Go blocking channels and select
you might make use of this small library and then your example code would look very similar in Python.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With