I'm using python-zookeeper for locking, and I'm trying to figure out a way of getting the execution to wait for notification when it's watching a file, because zookeeper.exists()
returns immediately, rather than blocking.
Basically, I have the code listed below, but I'm unsure of the best way to implement the notify()
and wait_for_notification()
functions. It could be done with os.kill()
and signal.pause()
, but I'm sure that's likely to cause problems if I later have multiple locks in one program - is there a specific Python library that is good for this sort of thing?
def get_lock(zh):
lockfile = zookeeper.create(zh,lockdir + '/guid-lock-','lock', [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
while(True):
# this won't work for more than one waiting process, fix later
children = zookeeper.get_children(zh, lockdir)
if len(children) == 1 and children[0] == basename(lockfile):
return lockfile
# yeah, there's a problem here, I'll fix it later
for child in children:
if child < basename(lockfile):
break
# exists will call notify when the watched file changes
if zookeeper.exists(zh, lockdir + '/' + child, notify):
# Process should wait here until notify() wakes it
wait_for_notification()
def drop_lock(zh,lockfile):
zookeeper.delete(zh,lockfile)
def notify(zh, unknown1, unknown2, lockfile):
pass
def wait_for_notification():
pass
The Condition variables from Python's threading module are probably a very good fit for what you're trying to do:
http://docs.python.org/library/threading.html#condition-objects
I've extended to the example to make it a little more obvious how you would adapt it for your purposes:
#!/usr/bin/env python
from collections import deque
from threading import Thread,Condition
QUEUE = deque()
def an_item_is_available():
return bool(QUEUE)
def get_an_available_item():
return QUEUE.popleft()
def make_an_item_available(item):
QUEUE.append(item)
def consume(cv):
cv.acquire()
while not an_item_is_available():
cv.wait()
print 'We got an available item', get_an_available_item()
cv.release()
def produce(cv):
cv.acquire()
make_an_item_available('an item to be processed')
cv.notify()
cv.release()
def main():
cv = Condition()
Thread(target=consume, args=(cv,)).start()
Thread(target=produce, args=(cv,)).start()
if __name__ == '__main__':
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With