Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Set behavior to be executed when a thread would otherwise finish

Tags:

My module has two functions in it: do_something(), and change_behavior().

The function do_something() does Thing A by default. After change_behavior() has been called, do_something() does Thing B instead.

I want this transition to be thread-specific. That is, any new thread will have Thing A happen when it calls do_something(), but if that thread calls change_behavior(), then Thing B will happen instead when it continues to call do_something().

Each thread should be independent, so that one thread calling change_behavior() does not affect the behavior of do_something() for other threads.


My instinctive solution to this is to have behavior tied to the thread's ID (assessed via threading.get_ident()). The function do_something() checks a local table for whether or not the thread's ID is in it, and adjusts its behavior accordingly. Meanwhile, the function change_behavior() simply adds the current thread to that registry. This works at any given time because there are never two concurrent threads with the same ID.

The problem comes in when the current set of threads joins, and time passes, and the parent thread makes a whole bunch more threads. One of the new threads has the same ID as one of the previous threads, because thread IDs are reused sometimes. That thread calls do_something(), and because it's already in the registry, it does Thing B instead of Thing A.

To fix this, I need to remove the thread ID from the registry somehow, between when the first thread with that ID ends and when the second thread with that ID starts. Some hypothetical ideas I've come up with:

  • Periodically check whether each thread ID is still active. This is problematic because it both wastes CPU resources and can miss if a thread is destroyed and then recreated between ticks
  • Attach a method hook to be called whenever the thread joins. I'm not sure how to do this, besides the next idea
  • As part of change_behavior(), hijack/replace the current thread's ._quit() method with one that first removes the thread's ID from the registry. This seems like bad practice, and potentially breaking.

Another aspect of my use case is that, if possible, I'd like new threads to inherit the current behavior of their parent threads, so that the user doesn't have to manually set every flag they create - but this is more relevant to how I store the information about the state of the tread than it is to when the thread finishes, which makes it marginally less relevant to this particular question.

I'm looking for guidance on whether any of these particular solutions are ideal, standard, or idiomatic, and whether there's an intended thing to do in this use case.


Using threading.local() was suggested in the comments by @TarunLalwani. I've investigated it, and it is useful, but it doesn't account for the other use case I'd like to take care of - when a parent thread creates new subthreads, I want them to inherit the state of the parent thread. I was thinking of accomplishing this by replacing Thread.__init__(), but using local() would be incompatible with this use case in general, since I wouldn't be able to pass variables from parent to child threads.


I've also been experimenting, more successfully, with simply saving my attributes to the threads themselves:

current_thread = threading.current_thread()
setattr(current_thread, my_reference, new_value)

The problem with this is that, for a reason which completely mystifies me, any other variable in the module's namespace whose value is currently current_thread.my_reference also gets set to new_value. I have no idea why, and I've been unable to replicate the problem in a MVE (though it happens consistently in my IDE, even after restarting it). As my other currently-active question implies, the objects I'm setting here are references to output streams (every reference to an instance of the intermediary IO streaming I described in that answer is getting replaced by the file descriptor with which this method is being called), if that has anything to do with it, but I can't imagine why the type of object would affect how references work in this case.

like image 253
Green Cloak Guy Avatar asked Jul 09 '19 19:07

Green Cloak Guy


People also ask

How do you end a thread in Python?

We can close a thread by returning from the run function at any time. This can be achieved by using the “return” statement in our target task function. If the threading. Thread class has been extended and the run() function overridden, then the “return” statement can be used in the run() function directly.

What does a threading lock do?

A lock allows you to force multiple threads to access a resource one at a time, rather than all of them trying to access the resource simultaneously. As you note, usually you do want threads to execute simultaneously.

How would you execute functions in a separate new thread with the module thread?

First, we must create a new instance of the threading. Thread class and specify the function we wish to execute in a new thread via the “target” argument. The function executed in another thread may have arguments in which case they can be specified as a tuple and passed to the “args” argument of the threading.

Can you make multiple thread to execute same instructions?

In the same multithreaded process in a shared-memory multiprocessor environment, each thread in the process can run concurrently on a separate processor, resulting in parallel execution, which is true simultaneous execution.


1 Answers

My answer is a very simple answer to your question, hence I wonder if I missed something. Basically, I think you should avoid to store the current state of extrnal objects in your module.

You need to store the state (if change_behavior was called and maybe some other data) somewhere. You have two main options: store the state in the module or store the state in the thread itself. Aside from the issues you had in storing the state in the module, one expects a module to be (mainly) stateless, hence I think you should stick to the latter and store data in the thread.

Version 1

If you store the state in a field, you have a little risk of collision between the name of the attribute you create and the names of existing attributes, but if the documentation is clear and if you choose a good name, that should not be an issue.

A simple proof of concept, without setattr or hasattr (I didn't check the source code of CPython but maybe the weird behavior comes from setattr):

module1.py

import threading
import random
import time

_lock = threading.Lock()

def do_something():
    with _lock:
        t = threading.current_thread()
        try:
            if t._my_module_s:
                print(f"DoB ({t})")
            else:
                print(f"DoA ({t})")
        except AttributeError:
            t._my_module_s = 0
            print(f"DoA ({t})")

    time.sleep(random.random()*2)

def change_behavior():
    with _lock:
        t = threading.current_thread()
        print(f"Change behavior of: {t}")
        t._my_module_s = 1

test1.py

import random
import threading
from module1 import *

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        n = random.randint(1, 10)
        for i in range(n):
            do_something()
        change_behavior()
        for i in range(10-n):
            do_something()

thread_1 = MyThread()
thread_2 = MyThread()
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()

Output 1

DoA (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoA (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-1, started 140155115792128)>)
Change behavior of: <MyThread(Thread-1, started 140155115792128)>
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoA (<MyThread(Thread-2, started 140155107399424)>)
Change behavior of: <MyThread(Thread-2, started 140155107399424)>
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-1, started 140155115792128)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)
DoB (<MyThread(Thread-2, started 140155107399424)>)

Version 2

If you are sure that the end user will use your module inside threads, you can provide him/her a convenient way to do that. The idea is to handle the threads yourself. Just wrap the user function in a thread, and store the state of the thread in this thread as above. The difference is that you are the owner of the Thread child class and you avoid the risk of name collision. Plus, the code becomes, in my opinion, cleaner:

module2.py

import threading
import random
import time

_lock = threading.Lock()

def do_something():
    with _lock:
        t = threading.current_thread()
        t.do_something() # t must be a _UserFunctionWrapper
    time.sleep(random.random()*2)

def change_behavior():
    with _lock:
        t = threading.current_thread()
        t.change_behavior() # t must be a _UserFunctionWrapper

def wrap_in_thread(f):
    return _UserFunctionWrapper(f)

class _UserFunctionWrapper(threading.Thread):
    def __init__(self, user_function):
        threading.Thread.__init__(self)
        self._user_function = user_function
        self._s = 0

    def change_behavior(self):
        print(f"Change behavior of: {self}")
        self._s = 1

    def do_something(self):
        if self._s:
            print(f"DoB ({self})")
        else:
            print(f"DoA ({self})")

    def run(self):
        self._user_function()

test2.py

import random
from module2 import *

def user_function():
    n = random.randint(1, 10)
    for i in range(n):
        do_something() # won't work if the function is not wrapped
    change_behavior()
    for i in range(10-n):
        do_something()

thread_1 = wrap_in_thread(user_function)
thread_2 = wrap_in_thread(user_function)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()

Output 2

DoA (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
Change behavior of: <_UserFunctionWrapper(Thread-1, started 140193896072960)>
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoA (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
Change behavior of: <_UserFunctionWrapper(Thread-2, started 140193887680256)>
DoB (<_UserFunctionWrapper(Thread-2, started 140193887680256)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)
DoB (<_UserFunctionWrapper(Thread-1, started 140193896072960)>)

The drawback is that you have to use a thread even if you don't need it.

like image 125
jferard Avatar answered Sep 30 '22 20:09

jferard