Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python terminates at threading.join() randomly

I have a Python method with 4 threads. I have to process some data in these 4 threads and wait till all of the threads complete the processing and then proceed ahead.

The issue is, sometimes the script works as expected: it process the threads, then proceeds with the code after thread.join(). However, the method randomly terminates at thread.join(). I'm stuck at finding the exact issue and fixing it.

Here's my code

def check_gaze(self) -> List[List[int]]:
    folder_path = self._gaze_preprocess()

    .
    .
    .
    
    def process_files(files):
        output = []

        for _file in files:
            try:
                ...
            except:
                print("error")

        return output

    r1, r2, r3, r4 = [], [], [], []

    t1 = threading.Thread(target=lambda: r1.extend(process_files(file_groups[0])))
    t2 = threading.Thread(target=lambda: r2.extend(process_files(file_groups[1])))
    t3 = threading.Thread(target=lambda: r3.extend(process_files(file_groups[2])))
    t4 = threading.Thread(target=lambda: r4.extend(process_files(file_groups[3])))

    threads = [t1, t2, t3, t4]
    
    print("before start")

    for thread in threads:
        thread.start()
    
    print("after start")

    print("before join")

    for t in threads:
        t.join()
    
    print("after join")

    # Merge the results from all three threads and save to a CSV file
    output = r1 + r2 + r3 + r4

    data = self._gaze_postprocess()

    return data


obj = Gaze()

print("pre call")

gaze_output = obj.check_gaze()

print("post call")

Here's the output for the issue:

pre call
before start
after start
before join

Here's the terminal output (it lacks some debug print statements but I have checked that program does not go beyond thread.join()

console output


Edit: Using Louis Lac's solution from their answer posted below, I'm still facing the same issue. Based on my debugging so far, I can say that the issue is somewhere in the gaze detection package I'm using, because if I use dummy code in it's place I don't get the issue anymore!

code:

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            output.append([frame_number, 'No face', 'No face'])

    print(f'Output is : {output}')
    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    start = time.time()
    
    jpg_files = list(Path('test_9/img').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    
    
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    end = time.time()
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    main()

Results output ss


Solution

I fixed my code looking at Lie Ryan's answer below and used a ProcessPool instead of a ThreadPool. The reason for using multiprocessing here instead of multithreading is that the issue with threads is due to the eye_game package. While the code is open source, I haven't had the time to go through it and see where exactly the issues arise.

Here's my solution.

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            
            output.append([frame_number, 'No face', 'No face'])

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
            
    return results


def _debug_main():
    start = time.time()
    
    jpg_files = list(Path('path-to-directory').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    
    end = time.time()
    
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    _debug_main()
like image 887
stuckoverflow Avatar asked Jun 04 '26 09:06

stuckoverflow


2 Answers

Few recommendations:

  • use threads as a last resort. In your case you could use a ThreadPoolExecutor instead,
  • do not catch all possible exceptions (except:), usually you only want to catch non-exit exceptions (except Exception:),
  • handle errors inside the catch block instead of just printing it; you can either raise/re-raise an error or return None or an empty list if this is relevant,
  • avoid mutating shared state from a different threads, this can cause to bugs if not handled correctly (for instance if two threads mutate the same shared state at the same time),
  • wrap your main entry point in a if __name__ == "__main__": block as it can cause issues with multiprocessing if not present.

Here is an example which respects these guidelines:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def process_files(files: list) -> list:
    output = []

    for file_ in files:
        sleep(0.5)
        try:
            output.append(file_)
        except Exception as e:
            print(e)
            raise ValueError("un-expected value received")

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor() as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    file_groups = [["a", "b"], ["c"], ["d", "e", "f"]]
    results = check_gaze(file_groups)
    print(results)


if __name__ == "__main__":
    main()

There is indeed shared states hidden deeply in the face_recognition dependency of the eye_game package. If you dig inside eye_game.get_gaze_direction(), you'll see that deep down the call tree there is a call to face_recognition.face_locations() with accesses a face detector instantiated as a global variable (here). This then calls into a C++ library (dlib) which is likely not thread-safe.

From this observation, you have few solutions to address the issue:

  • Use a lock/mutex around the code using the shared state to avoid concurrent accesses. This will work with treads, however, this defeats the purpose of multi-threading since this will negate any speed gain now that the calls are sequentials.
  • Use multi-processing instead as advised in another answer. However, keep in mind that the global state (the face detector here) will be instantiated multiple time, once per process. This can lead to high memory usage if the model is large and could trigger Out Of Memory (OOM) errors.
like image 128
Louis Lac Avatar answered Jun 07 '26 09:06

Louis Lac


Maybe your program crashed with a segfault or core dump? In those cases, these are uncontrolled crash and your program may terminate immediately and there may not be any python-level exception handling.

In pure Python code, uncontrolled crash like that is extremely rare, they're quite unusual, but it is not uncommon to have an uncontrolled crash leading into a segfault/coredump if you call into a native library and there's a bug in that library, or if you passed an argument of the wrong type, or if you call the library in ways that are not thread safe. It seems like the library you're calling here is a software for face tracking, which almost certainly would internally have some native code.

If you replace the multithreading with multiprocessing does the program still crash? If they do, then that may be a bug in the library or there are issues with the arguments that you passed into the library (e.g. it's expecting a string, but you passed in an int).

If the code only crash in multi threaded code, try rewriting your code to use multi processing. Does it still crash in multi processing? If the program crash in multithreading but not multiprocessing, then it's likely that the library is not thread safe. It may have an internal global state and calling it in multithreaded may cause a crash that normally don't happen in single threaded code.

Does the issue still happen if you deep copy your input parameters before you pass it to the thread:

t1 = threading.Thread(
    target=lambda files: 
        r1.extend(process_files(files)
    ),
    args=[deepcopy(file_groups[0])],
)

If deepcopying the input parameters fixes the issue, then there may have been shared mutable state between the input parameters and if the library isn't fully thread safe, that may be the cause of issues.

Note that even code that are read only in Python actually is mutable at native code level due to reference counting, if the library uses the C macros provided by CPython to handle ref counts and if it handles acquiring and releasing GIL correctly, then these should normally not be an issue, but if the library that doesn't, then there is a good chance that it may coredump/segfault.

like image 40
Lie Ryan Avatar answered Jun 07 '26 10:06

Lie Ryan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!