I have a Python method with 4 threads. I have to process some data in these 4 threads and wait till all of the threads complete the processing and then proceed ahead.
The issue is, sometimes the script works as expected: it process the threads, then proceeds with the code after thread.join(). However, the method randomly terminates at thread.join(). I'm stuck at finding the exact issue and fixing it.
Here's my code
def check_gaze(self) -> List[List[int]]:
folder_path = self._gaze_preprocess()
.
.
.
def process_files(files):
output = []
for _file in files:
try:
...
except:
print("error")
return output
r1, r2, r3, r4 = [], [], [], []
t1 = threading.Thread(target=lambda: r1.extend(process_files(file_groups[0])))
t2 = threading.Thread(target=lambda: r2.extend(process_files(file_groups[1])))
t3 = threading.Thread(target=lambda: r3.extend(process_files(file_groups[2])))
t4 = threading.Thread(target=lambda: r4.extend(process_files(file_groups[3])))
threads = [t1, t2, t3, t4]
print("before start")
for thread in threads:
thread.start()
print("after start")
print("before join")
for t in threads:
t.join()
print("after join")
# Merge the results from all three threads and save to a CSV file
output = r1 + r2 + r3 + r4
data = self._gaze_postprocess()
return data
obj = Gaze()
print("pre call")
gaze_output = obj.check_gaze()
print("post call")
Here's the output for the issue:
pre call
before start
after start
before join
Here's the terminal output (it lacks some debug print statements but I have checked that program does not go beyond thread.join()

Edit: Using Louis Lac's solution from their answer posted below, I'm still facing the same issue. Based on my debugging so far, I can say that the issue is somewhere in the gaze detection package I'm using, because if I use dummy code in it's place I don't get the issue anymore!
code:
from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game
def process_files(files: list) -> list:
output = []
for file_ in files:
try:
frame_number = Path(file_).name.split(".")[0]
gaze = eye_game.get_gaze_direction(file_)
output.append([frame_number, gaze, gaze])
except Exception as e:
print(e)
output.append([frame_number, 'No face', 'No face'])
print(f'Output is : {output}')
return output
def check_gaze(file_groups: list[list]) -> list:
results = []
with ThreadPoolExecutor(max_workers=4) as pool:
outputs = pool.map(process_files, file_groups)
for result in outputs:
results.append(result)
return results
def main():
start = time.time()
jpg_files = list(Path('test_9/img').glob("*.jpg"))
jpg_files = [str(f.resolve()) for f in jpg_files]
file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
results = check_gaze(file_groups)
print(results)
end = time.time()
print(f'The script took {(end - start) /60} minutes')
if __name__ == "__main__":
main()
Results

I fixed my code looking at Lie Ryan's answer below and used a ProcessPool instead of a ThreadPool. The reason for using multiprocessing here instead of multithreading is that the issue with threads is due to the eye_game package. While the code is open source, I haven't had the time to go through it and see where exactly the issues arise.
Here's my solution.
from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game
def process_files(files: list) -> list:
output = []
for file_ in files:
try:
frame_number = Path(file_).name.split(".")[0]
gaze = eye_game.get_gaze_direction(file_)
output.append([frame_number, gaze, gaze])
except Exception as e:
print(e)
output.append([frame_number, 'No face', 'No face'])
return output
def check_gaze(file_groups: list[list]) -> list:
results = []
with ThreadPoolExecutor(max_workers=4) as pool:
outputs = pool.map(process_files, file_groups)
for result in outputs:
results.append(result)
return results
def _debug_main():
start = time.time()
jpg_files = list(Path('path-to-directory').glob("*.jpg"))
jpg_files = [str(f.resolve()) for f in jpg_files]
file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
results = check_gaze(file_groups)
print(results)
end = time.time()
print(f'The script took {(end - start) /60} minutes')
if __name__ == "__main__":
_debug_main()
Few recommendations:
ThreadPoolExecutor instead,except:), usually you only want to catch non-exit exceptions (except Exception:),None or an empty list if this is relevant,main entry point in a if __name__ == "__main__": block as it can cause issues with multiprocessing if not present.Here is an example which respects these guidelines:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def process_files(files: list) -> list:
output = []
for file_ in files:
sleep(0.5)
try:
output.append(file_)
except Exception as e:
print(e)
raise ValueError("un-expected value received")
return output
def check_gaze(file_groups: list[list]) -> list:
results = []
with ThreadPoolExecutor() as pool:
outputs = pool.map(process_files, file_groups)
for result in outputs:
results.append(result)
return results
def main():
file_groups = [["a", "b"], ["c"], ["d", "e", "f"]]
results = check_gaze(file_groups)
print(results)
if __name__ == "__main__":
main()
There is indeed shared states hidden deeply in the face_recognition dependency of the eye_game package. If you dig inside eye_game.get_gaze_direction(), you'll see that deep down the call tree there is a call to face_recognition.face_locations() with accesses a face detector instantiated as a global variable (here). This then calls into a C++ library (dlib) which is likely not thread-safe.
From this observation, you have few solutions to address the issue:
Maybe your program crashed with a segfault or core dump? In those cases, these are uncontrolled crash and your program may terminate immediately and there may not be any python-level exception handling.
In pure Python code, uncontrolled crash like that is extremely rare, they're quite unusual, but it is not uncommon to have an uncontrolled crash leading into a segfault/coredump if you call into a native library and there's a bug in that library, or if you passed an argument of the wrong type, or if you call the library in ways that are not thread safe. It seems like the library you're calling here is a software for face tracking, which almost certainly would internally have some native code.
If you replace the multithreading with multiprocessing does the program still crash? If they do, then that may be a bug in the library or there are issues with the arguments that you passed into the library (e.g. it's expecting a string, but you passed in an int).
If the code only crash in multi threaded code, try rewriting your code to use multi processing. Does it still crash in multi processing? If the program crash in multithreading but not multiprocessing, then it's likely that the library is not thread safe. It may have an internal global state and calling it in multithreaded may cause a crash that normally don't happen in single threaded code.
Does the issue still happen if you deep copy your input parameters before you pass it to the thread:
t1 = threading.Thread(
target=lambda files:
r1.extend(process_files(files)
),
args=[deepcopy(file_groups[0])],
)
If deepcopying the input parameters fixes the issue, then there may have been shared mutable state between the input parameters and if the library isn't fully thread safe, that may be the cause of issues.
Note that even code that are read only in Python actually is mutable at native code level due to reference counting, if the library uses the C macros provided by CPython to handle ref counts and if it handles acquiring and releasing GIL correctly, then these should normally not be an issue, but if the library that doesn't, then there is a good chance that it may coredump/segfault.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With