I am creating a project in Python and I would like to add a monitoring system that makes use of events and event handlers. I would like this system to be available throughout the project. I have the following operations in mind:
How can I create such a system? Are there any libraries that can help me with this? I am especially wondering how I can make this system in such a way that it is transparently available throughout the project.
You can do most of what you want with forty lines of Python code. This is my own design that I use all the time. The function names are chosen to make it a drop-in replacement for Qt's "signals" and "slots".
It's simple to use. You create a PSignal
. You register handlers by calling the connect
method. A handler can be any callable. When an event occurs you emit a signal (i.e., notify an event) by calling the emit
function. Every registered callable runs at that point. The object calling emit
doesn't know, or care, whether anyone is listening or what happens if they are.
You can also disconnect a handler.
There is a lot of debugging code because I discovered that otherwise certain errors can be difficult to track down.
In your question you wanted each handler to be a monitor, and in my design handlers are just functions. But it seems to me that your "monitor" concept is independent of the event/handler mechanism. You're going to have to write functions to make your application go, and it should be pretty easy to make those functions call your monitors.
The code is extensively tested with Python 3.3.
#! python3
import traceback
class PSignal:
def __init__(self, debug=False):
self.debug = debug
self.__handlers = []
def clear(self):
"""Deletes all the handlers."""
self.__handlers.clear()
def connect(self, f):
"""f is a python function."""
if not callable(f):
raise ValueError("Object {!r} is not callable".format(f))
self.__handlers.append(f)
if self.debug:
print("PSIGNAL: Connecting", f, self.__handlers)
def disconnect(self, f):
for f1 in self.__handlers:
if f == f1:
self.__handlers.remove(f)
return
def emit(self, *x, **y):
self._emit(*x, **y)
def check_debug(self):
if self.debug and self.__handlers:
print("PSIGNAL: Signal emitted")
traceback.print_stack()
def _emit(self, *x, **y):
self.check_debug()
for f in self.__handlers:
try:
if self.debug:
print("PSIGNAL: emit", f, len(x), x, y)
f(*x, **y)
except Exception:
print("PSIGNAL: Error in signal", f)
traceback.print_exc()
Check out Reactive Python (RxPy)
ReactiveX
GitHub/python
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With