I trying to implement the concurrent.futures have been having an issue, I generated a code that when executed in:
Python Terminal
Enters in some infinite loop in my last loop and do not finish the code, this code below do finish, however reproduces the error thgat i receive in the Debug Mode below
So when I run it in Visual Studio Code Debug Mode
Generates the folowing error:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\concurrent\futures\process.py", line 102, in _python_exit
thread_wakeup.wakeup()
File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\concurrent\futures\process.py", line 90, in wakeup
self._writer.send_bytes(b"")
File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\multiprocessing\connection.py", line 183, in send_bytes
self._check_closed()
File "C:\Users\debor\anaconda3\envs\multimidia_image\lib\multiprocessing\connection.py", line 136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
This code reproduces the error in the Debug Mode:
import numpy as np
import pandas as pd
import concurrent.futures
import multiprocessing
from itertools import product
def main_execucao(n_exec,input_n_geracoes_max,tipo_apt,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words):
# Dicionário vazio para armazenar resultados por geração n+1
resultados = {}
resultados[("teste")] = [1,0]
return resultados
def main():
# Parametros problemáticos Process
tmi = ["tm1"]
si = ["s1"]
ci = ["c1"]
ri = ["r2"]
# Lista de variantes
variantes = list(product(*[tmi, si, ci, ri]))
# Numero de execuções
input_n_execucoes = 3
# Inputs para main
# Inputs gerais
# Lista iterável para chamar função
lista_n_exec=range(0,input_n_execucoes)
# Numero de gerações
input_n_geracoes_max = [50] * len(lista_n_exec)
# Palavras
w_1=np.flip(np.array(list("send")),0)
w_2=np.flip(np.array(list("more")),0)
w_3=np.flip(np.array(list("money")),0)
words=[w_1,w_2,w_3]
words_lista=[words]* len(lista_n_exec)
for param in variantes:
# Inputs parameters
# Tipo aptidao (apt_1= aptidão simples, apt_2 aptidão invertida, apt_3 invertida normalizada pior valor)
tipo_apt = "apt_1"
tipo_apt_lista = [tipo_apt] * len(lista_n_exec)
# População inicial
input_pop_0 = [100] * len(lista_n_exec)
tipo_pop_gerar = ["sem_repeticao_populacao_inicial"] * len(lista_n_exec)
nome_selecao_cross =param[1]
tipo_selecao_crossover = [nome_selecao_cross] * len(lista_n_exec)
tour = [3] * len(lista_n_exec)# Apenas se seleção for torneio
# Tipo de crossover
nome_cross = param[2]
tipo_crossover = [nome_cross] * len(lista_n_exec)
# % de Crossover
input_p_cross = [0.8] * len(lista_n_exec)
# % de mutação
nome_mutacao = param[0]
if nome_mutacao=="tm1":
perc_mutacao=0.02
elif nome_mutacao=="tm2":
perc_mutacao=0.10
elif nome_mutacao=="tm3":
perc_mutacao=0.2
tipo_mutacao = [perc_mutacao] * len(lista_n_exec)
# Tipo de reinserção
nome_reinsercao = param[3]
tipo_reinsercao = [nome_reinsercao] * len(lista_n_exec)
# Finaliza inputs
# Index contar número de execuções e gerações
ix_exec_real=0
ger_exec_real=0
# Número total de convergencias após todas execuções
total_conv_tds_exec=0
# Dicionário para armazenar resultados de todas as n execuções
results={}
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in (executor.map(main_execucao,lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista)):
results.update(result)
print("finish")
if __name__ == '__main__':
multiprocessing.freeze_support()
main()
Do anyone have a clue of what i can try? I have found a similar issue: https://github.com/getsentry/sentry-python/issues/423
Many thanks
Per #434,
Python Standard Library - concurrent.futures used thread to manage tasks queue, therefore when sentry patched start method of Thread, it will cause reference cycle and prevent gc from collecting.
So in concurrent.futures.process, the _queue_management_thread in _threads_wakeups will be discarded until running a full gc collection, if not, exception will be raise when python exited, because the actual manage thread has already stopped.
The fix can be found there, permitting one to manually input it into their Python 3.8 rather than upgrading to Python 3.9.
See if the problem goes away by switching from using the concurrent.futures.ProcessPoolExecutor
class to using the multiprocess.pool.Pool
class by changing two lines:
Change from:
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in (executor.map(main_execucao,lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista)):
To:
with multiprocessing.Pool() as executor:
for result in (executor.starmap(main_execucao,zip(lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista))):
You can also, of course, now remove the import concurrent.futures
statement.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With