Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python threads, how do Event and Queue work together?

Tags:

I was talking with my friend,after looking at example from Beasley's book

class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()

    def send(self, msg):
        self._mailbox.put(msg)

    def recv(self):
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg

    def close(self):
        self.send(ActorExit)

    def start(self):
        self._terminated = Event()
        t = Thread(target=self._bootstrap)
        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()

    def join(self):
        self._terminated.wait()

    def run(self):
        while True:
            msg = self.recv()

class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)

My friend argues that sole purpose of Event is to block the main thread until the other thread performs set operation. Is that true? How can we watch thread execution?

like image 620
Richard Rublev Avatar asked Aug 29 '18 10:08

Richard Rublev


People also ask

How do threads communicate with each other in Python?

Given multiple threads in the program and one wants to safely communicate or exchange data between them. Perhaps the safest way to send data from one thread to another is to use a Queue from the queue library. To do this, create a Queue instance that is shared by the threads.

What is event queue in Python?

An event queuer is basically a piece of software that receives events to be executed and – thus the term queue – executes them from the first arrived to the last, sequentially one after another.

Can Python can execute multiple instructions simultaneously by using threads?

Using Threads for a low number of tasks Threading in Python is simple. It allows you to manage concurrent threads doing work at the same time. The library is called “threading“, you create “Thread” objects, and they run target functions for you.

Are queues in Python thread-safe?

Yes, Queue is thread-safe.


1 Answers

Python threads, how do Event and Queue work together?

They don't. You can use Events without queues and queues without Events, there's no dependency on each other. Your example just happens to use both.

My friend argues that sole purpose of Event is to block the main thread until the other thread performs set operation. Is that true?

Calling .wait() on an Event-object will block any calling thread until the internal flag is .set().

If you look at the source for Event, you'll find that Events just consist of a Condition variable with a lock and a boolean flag + methods to handle and communicate (to waiting threads) state changes of that flag.

class Event:

    """Class implementing event objects.
    Events manage a flag that can be set to true with the set() method and reset
    to false with the clear() method. The wait() method blocks until the flag is
    true.  The flag is initially false.
    """

    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False
    ...

How can we watch thread execution?

A simple method would be to apply some sort of utility function that prints out what you're interested in, for example:

def print_info(info=""):
    """Print calling function's name and thread with optional info-text."""
    calling_func = sys._getframe(1).f_code.co_name
    thread_name = threading.current_thread().getName()
    print(f"<{thread_name}, {calling_func}> {info}", flush=True)

Another possibility would be to use logging like in this answer.


Not sure what Beazly wanted to demonstrate with the code you showed, but it deems a little over-engineered to me for this simple task. Involving Events here on top is unnecessary when you already use a queue. You can initialize thread termination by passing a sentinel-value.

Here's a simplified version of your example with sentinel ('STOP') and some info-prints with print_info from above:

import sys
import time
import threading
from queue import Queue


class Actor(threading.Thread):

    def __init__(self):
        super().__init__(target=self.run)
        self.queue = Queue()

    def send(self, msg):
        self.queue.put(msg)
        print_info(f"sent: {msg}")  # DEBUG

    def close(self):
        print_info()  # DEBUG
        self.send('STOP')

    def run(self):
        for msg in iter(self.queue.get, 'STOP'):
            pass


class PrintActor(Actor):
    def run(self):
        for msg in iter(self.queue.get, 'STOP'):
            print_info(f"got: {msg}")  # DEBUG


if __name__ == '__main__':

    pa = PrintActor()
    pa.start()
    pa.send("Hello")
    time.sleep(2)
    pa.send("...World!")
    time.sleep(2)
    pa.close()
    pa.join()

Output:

<MainThread, send> sent: Hello
<Thread-1, run> got: Hello
<MainThread, send> sent: ...World!
<Thread-1, run> got: ...World!
<MainThread, close> 
<MainThread, send> sent: STOP
like image 173
Darkonaut Avatar answered Sep 28 '22 09:09

Darkonaut