async def run_check(shell_command):
p = await asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
fut = p.communicate()
try:
pcap_run = await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
p.kill()
await p.communicate()
def get_coros(pcap_list):
for pcap_loc in pcap_list:
for pcap_check in get_pcap_executables():
tmp_coro = (run_check('{args}'
.format(e=sys.executable, args=args)))
if tmp_coro != False:
coros.append(tmp_coro)
return coros
async def main():
pcap_list_gen = print_dir_cointent()
for pcap_list in pcap_list_gen:
p_coros = get_coros(pcap_list)
for f in asyncio.as_completed(p_coros):
res = await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
pcap_list_gen is a generator of PCAP lists which contain multile lists. I get [OS: Error: To many open file] if I pass all pcaps in a single list, so decided to group them into list of smaller size and process one at a time.
Each pcap_list is a list of multiple PCAPS. I want the next iteration of the loop to only start once the first iteration is finished.
for pcap_list in pcap_list_gen:
p_coros = get_coros(pcap_list)
for f in asyncio.as_completed(p_coros):
res = await f
As per my knowledge: The first loop is getting executed properly asa the iteration goes to next loop it throws error.
Traceback:
Exception ignored in: <generator object BaseSubprocessTransport._connect_pipes at 0x7f7b7c165830>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 188, in _connect_pipes
waiter.set_exception(exc)
File "/usr/lib/python3.5/asyncio/futures.py", line 349, in set_exception
self._schedule_callbacks()
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
self._loop.call_soon(callback, self)
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
handle = self._call_soon(callback, args)
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
self._check_closed()
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[ERROR] Task was destroyed but it is pending!
task: <Task pending coro=<BaseSubprocessTransport._connect_pipes() done, defined at /usr/lib/python3.5/asyncio/base_subprocess.py:156> wait_for=<Future pending cb=[Task._wakeup()]>>
More logs
RuntimeError: Event loop is closed
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=70435 running stdin=<_UnixWritePipeTransport closing fd=12 open>>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
[ERROR] Task exception was never retrieved
future: <Task finished coro=<ClassificationCheck.run_check() done, defined at ./regression.py:159> exception=RuntimeError('cannot reuse already awaited coroutine',)>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
O/P of _p_coros_ after for loop iteration in main() as
P_COROS(1st iteration): [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>]
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
P_COROS(2nd iteration): [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>, <coroutine object ClassificationCheck.run_check at 0x7f746d943048>, <coroutine object ClassificationCheck.run_check at 0x7f746d9430f8>]
Traceback (most recent call last):
File "./regression.py", line 325, in <module>
loop.run_until_complete(ClassCheck.main())
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "./regression.py", line 201, in main
res = await f
File "/usr/lib/python3.5/asyncio/tasks.py", line 492, in _wait_for_one
return f.result() # May raise f.exception().
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
Exception ignored in: <bound method BaseEventLoop.__del__ of <_UnixSelectorEventLoop running=False closed=True debug=False>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_events.py", line 431, in __del__
File "/usr/lib/python3.5/asyncio/unix_events.py", line 58, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 139, in remove_signal_handler
File "/usr/lib/python3.5/signal.py", line 47, in signal
TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=89531 running stdin=<_UnixWritePipeTransport closing fd=8 open>>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.
Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.
Does Asyncio run return? asyncio. run(coro) will run coro , and return the result. It will always start a new event loop, and it cannot be called when the event loop is already running.
ensure_future is a method to create Task from coroutine . It creates tasks in different ways based on argument (including using of create_task for coroutines and future-like objects). create_task is an abstract method of AbstractEventLoop . Different event loops can implement this function different ways.
Warning says that at the moment you call loop.close()
something related to process still running. I guess you should wait() for it's termination (read also note at the link). Try this:
try:
pcap_run = await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
p.terminate()
finally:
await p.wait()
Upd:
Oh, you probably used global coros
variable:
def get_coros(pcap_list):
coros = [] # <--------------- create new list to fill
for pcap_loc in pcap_list:
for pcap_check in get_pcap_executables():
tmp_coro = (run_check('{args}'
.format(e=sys.executable, args=args)))
if tmp_coro != False:
coros.append(tmp_coro)
return coros
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With