Using the new asyncio in python 3.4, how do I acquire the first lock/semaphores that is available from a set of locks/semaphores?
The approach that I did was using wait(return_when=FIRST_COMPLETED)
, then cancel all the acquire()
s that are still pending once I manage to acquire one. But I'm concerned that this may cause subtle bugs / race conditions, and I have the feeling that there is a more elegant way of doing it.
import asyncio as aio
@aio.coroutine
def run():
sem1, sem2 = (aio.Semaphore(), aio.Semaphore())
print('initial:', sem1, sem2)
a = aio.async(sleep(sem1, 1)) # acquire sem1
print('just after sleep:', sem1, sem2)
done, pending = yield from aio.wait([sem1.acquire(), sem2.acquire()], return_when=aio.FIRST_COMPLETED)
print('done:', done)
print('pending:', pending)
for task in pending:
task.cancel()
print('after cancel:', sem1, sem2)
yield from aio.wait([a])
print('after wait:', sem1, sem2)
@aio.coroutine
def sleep(sem, i):
with (yield from sem):
yield from aio.sleep(i)
if __name__ == "__main__":
aio.get_event_loop().run_until_complete(run())
The code above gives (memory addresses edited):
initial: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [unlocked,value:1]>
just after sleep: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [unlocked,value:1]>
done: {Task(<acquire>)<result=True>}
pending: {Task(<acquire>)<PENDING>}
after cancel: <asyncio.locks.Semaphore object at 0x1 [locked,waiters:1]> <asyncio.locks.Semaphore object at 0x2 [locked]>
after wait: <asyncio.locks.Semaphore object at 0x1 [unlocked,value:1]> <asyncio.locks.Semaphore object at 0x2 [locked]>
If I understand your problem correctly, you want to have a two different pools of locks, one that allows X number of connections per-proxy, and another that allows Y number of global connections. A single Semaphore
object can actually be used for this pretty easily:
class asyncio.Semaphore(value=1, *, loop=None)
A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some other thread calls release().
So, rather than using a list of Semaphore
objects each initialized with the default value
of 1 to implement the pool, just initialize the value
of a single Semaphore
to whatever the maximum number of tasks you want to be able to run concurrently is.
proxy_sem = Semaphore(value=5) # 5 connections will be able to hold this semaphore concurrently
global_sem = Semaphore(value=15) # 15 connections will be able to hold this semaphore
Then in your code, just always acquire the proxy semaphore prior to acquiring the global one
with (yield from proxy_sem):
with (yield from global_sem):
That way, you won't hold a global lock while you wait on a proxy-specific lock, which could potentially block a connection from another proxy that would be free to run if it could get a global lock.
Edit:
Here's a complete example that demonstrates a way to do this without requiring a proxy-specific lock at all. Instead, you run one coroutine for each proxy, all of which consume from the same queue. The proxy coroutines limit the number of concurrent tasks they run simply by keeping track of the active tasks they've launched, and only starting new tasks when they drop below the limit. When a proxy coroutine launches a task, that task is responsible for acquiring the global semaphore. Here's the code:
import asyncio
import random
PROXY_CONN_LIMIT = 5
GLOBAL_CONN_LIMIT = 20
PROXIES = ['1.2.3.4', '1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4']
@asyncio.coroutine
def do_network_stuff(item, proxy_info):
print("Got {}. Handling it with proxy {}".format(item, proxy_info))
# Wait a random amount of time to simulate actual work being done.
yield from asyncio.sleep(random.randint(1,7))
@asyncio.coroutine
def handle_item(item, proxy_info, global_sem):
with (yield from global_sem): # Get the global semaphore
yield from do_network_stuff(item, proxy_info)
@asyncio.coroutine
def proxy_pool(proxy_info, queue, global_sem):
tasks = []
def remove_item(task, *args):
tasks.remove(task)
while True: # Loop infinitely. We'll return when we get a sentinel from main()
while len(tasks) < PROXY_CONN_LIMIT: # Pull from the queue until we hit our proxy limit
item = yield from queue.get()
print(len(tasks))
if item is None: # Time to shut down
if tasks:
# Make sure all pending tasks are finished first.
yield from asyncio.wait(tasks)
print("Shutting down {}".format(proxy_info))
return
# Create a task for the work item, and add it to our list of
# tasks.
task = asyncio.async(handle_item(item, proxy_info, global_sem))
tasks.append(task)
# We've hit our proxy limit. Now we wait for at least one task
# to complete, then loop around to pull more from the queue.
done, pending = yield from asyncio.wait(tasks,
return_when=asyncio.FIRST_COMPLETED)
# Remove the completed tasks from the active tasks list.
for d in done:
tasks.remove(d)
@asyncio.coroutine
def main():
global_sem = asyncio.Semaphore(GLOBAL_CONN_LIMIT)
queue = asyncio.Queue()
tasks = []
# Start the proxy pools.
for proxy in PROXIES:
tasks.append(asyncio.async(proxy_pool(proxy, queue, global_sem)))
# Send work to the proxy pools.
for i in range(50):
yield from queue.put(i)
# Tell the proxy pools to shut down.
for _ in PROXIES:
yield from queue.put(None)
# Wait for them to shut down.
yield from asyncio.wait(tasks)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sample output:
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4
Got 0. Handling it with proxy 1.2.3.4
Got 1. Handling it with proxy 1.2.3.4
Got 2. Handling it with proxy 1.2.3.4
Got 3. Handling it with proxy 1.2.3.4
Got 4. Handling it with proxy 1.2.3.4
Got 5. Handling it with proxy 1.1.1.1
Got 6. Handling it with proxy 1.1.1.1
Got 7. Handling it with proxy 1.1.1.1
Got 8. Handling it with proxy 1.1.1.1
Got 9. Handling it with proxy 1.1.1.1
Got 10. Handling it with proxy 2.2.2.2
Got 11. Handling it with proxy 2.2.2.2
Got 12. Handling it with proxy 2.2.2.2
Got 13. Handling it with proxy 2.2.2.2
Got 14. Handling it with proxy 2.2.2.2
Got 15. Handling it with proxy 3.3.3.3
Got 16. Handling it with proxy 3.3.3.3
Got 17. Handling it with proxy 3.3.3.3
Got 18. Handling it with proxy 3.3.3.3
Got 19. Handling it with proxy 3.3.3.3
Got 20. Handling it with proxy 4.4.4.4
Got 21. Handling it with proxy 4.4.4.4
Got 22. Handling it with proxy 4.4.4.4
Got 23. Handling it with proxy 4.4.4.4
4
4
4
4
Got 24. Handling it with proxy 4.4.4.4
Got 25. Handling it with proxy 1.2.3.4
Got 26. Handling it with proxy 2.2.2.2
Got 27. Handling it with proxy 1.1.1.1
Got 28. Handling it with proxy 3.3.3.3
3
4
4
4
4
4
Got 29. Handling it with proxy 4.4.4.4
Got 30. Handling it with proxy 4.4.4.4
Got 31. Handling it with proxy 2.2.2.2
Got 32. Handling it with proxy 1.1.1.1
4
4
4
Got 33. Handling it with proxy 1.2.3.4
Got 34. Handling it with proxy 3.3.3.3
Got 35. Handling it with proxy 1.1.1.1
Got 36. Handling it with proxy 2.2.2.2
Got 37. Handling it with proxy 3.3.3.3
3
4
4
4
4
Got 38. Handling it with proxy 1.2.3.4
4
Got 39. Handling it with proxy 1.2.3.4
Got 40. Handling it with proxy 2.2.2.2
Got 41. Handling it with proxy 1.1.1.1
Got 42. Handling it with proxy 3.3.3.3
Got 43. Handling it with proxy 4.4.4.4
2
3
4
4
4
4
Got 44. Handling it with proxy 1.2.3.4
Got 45. Handling it with proxy 1.2.3.4
Got 46. Handling it with proxy 1.2.3.4
Got 47. Handling it with proxy 1.1.1.1
Got 48. Handling it with proxy 4.4.4.4
Got 49. Handling it with proxy 2.2.2.2
3
4
4
4
Shutting down 3.3.3.3
4
Shutting down 2.2.2.2
Shutting down 1.1.1.1
Shutting down 4.4.4.4
Shutting down 1.2.3.4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With