I want to check the reachability of about 100 ips addresses and set a limit of concurrent tasks with semaphore. But now I'm not sure how this works exactly or why it doesn't work in the code example. As I could observe the function "task_reachable" is still executed correctly. if no address is reachable, then in the "try_ssh_connection" "all" tasks are executed in parallel and this makes the code incredibly slow.
class test_class():
def __init__(self):
self.username = "username"
self.password = "password"
self.ips = open("open_ip_list")
def create_async(self):
asyncio.run(self.create_tasks())
async def boundary_task(self,ip):
sem = asyncio.Semaphore(2)
async with sem:
return await self.task_test(ip)
async def create_tasks(self):
timer = Timer(text=f" task time: {{:.1f}}")
timer.start()
tasks = [
asyncio.ensure_future(self.boundary_task(i))
for i
in self.ips
]
await asyncio.gather(*tasks)
timer.stop()
async def task_test(self):
pass
if __name__ == "__main__":
app = test_class()
app.create_async()
Your problem is each running instance of boundary_task has its own semaphore.
async def boundary_task(self, ip):
sem = asyncio.Semaphore(2)
If you want them to all use the same semaphore, all instances of boundary_task need to share it.
async def boundary_task(self, ip, semaphore):
async with sem:
return await self.task_reachable(ip)
async def create_tasks(self):
sem = asyncio.Semaphore(2)
tasks = [
self.boundary_task(i, sem)
for i
in self.ips
]
await asyncio.gather(*tasks)
Since you are using a class, you could also create the semaphore inside __init__.
def __init__(self):
...
self.sem = asyncio.Semaphore(2)
async def boundary_task(self, ip):
async with self.sem:
return await self.task_reachable(ip)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With