I have a simple script which watches Kubernetes events and then publishes a message to a NATS server:
#!/usr/bin/env python
import asyncio
import argparse
import json
import logging
import os
from kubernetes import client, config, watch
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
# monkey patch
from kube import local_load_oid_token
config.kube_config.KubeConfigLoader._load_oid_token = local_load_oid_token
parser = argparse.ArgumentParser()
parser.add_argument('--in-cluster', help="use in cluster kubernetes config", action="store_true")
parser.add_argument('-a', '--nats-address', help="address of nats cluster", default=os.environ.get('NATS_ADDRESS', None))
parser.add_argument('-d', '--debug', help="enable debug logging", action="store_true")
parser.add_argument('-p', '--publish-events', help="publish events to NATS", action="store_true")
parser.add_argument('--output-events', help="output all events to stdout", action="store_true", dest='enable_output')
parser.add_argument('--connect-timeout', help="NATS connect timeout (s)", type=int, default=10, dest='conn_timeout')
parser.add_argument('--max-reconnect-attempts', help="number of times to attempt reconnect", type=int, default=1, dest='conn_attempts')
parser.add_argument('--reconnect-time-wait', help="how long to wait between reconnect attempts", type=int, default=10, dest='conn_wait')
args = parser.parse_args()
logger = logging.getLogger('script')
ch = logging.StreamHandler()
if args.debug:
logger.setLevel(logging.DEBUG)
ch.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
if not args.nats_address:
logger.critical("No NATS cluster specified")
exit(parser.print_usage())
else:
logger.debug("Using nats address: %s", args.nats_address)
if args.in_cluster:
config.load_incluster_config()
else:
try:
config.load_kube_config()
except Exception as e:
logger.critical("Error creating Kubernetes configuration: %s", e)
exit(2)
v1 = client.CoreV1Api()
async def run(loop):
nc = NATS()
try:
await nc.connect(args.nats_address, loop=loop, connect_timeout=args.conn_timeout, max_reconnect_attempts=args.conn_attempts, reconnect_time_wait=args.conn_wait)
logger.info("Connected to NATS at %s..." % (nc.connected_url.netloc))
except Exception as e:
exit(e)
#print("Connected to NATS at {}...".format(nc.connected_url.netloc))
async def get_node_events():
w = watch.Watch()
for event in w.stream(v1.list_node):
accepted = ["DELETED"]
if event['type'] in accepted:
logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
msg = {'type':event['type'],'object':event['raw_object']}
logger.debug("Raw Message: %s" % msg)
await nc.publish("k8s_events", json.dumps(msg).encode('utf-8'))
if args.enable_output:
print(json.dumps(msg))
await get_node_events()
await nc.flush(timeout=3)
await nc.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(run(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
logger.info('keyboard shutdown')
tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
tasks.add_done_callback(lambda t: loop.stop())
tasks.cancel()
# Keep the event loop running until it is either destroyed or all
# tasks have really terminated
while not tasks.done() and not loop.is_closed():
loop.run_forever()
finally:
logger.info('closing event loop')
loop.close()
When running this with the event publishing enabled, I can see the event JSON being output.
However, for some reason my receiver isn't actually getting a NATS message for the deletion event.
You can validate message delivery through the NATS server logs. In configuring the NATS servers, temporarily enable debug/trace with the -DV flag passed to the NATS server or in the NATS configuration file with:
debug=true
trace=true
You should see something like this:
[31070] 2019/09/10 13:34:40.426198 [DBG] 127.0.0.1:53203 - cid:6 - Client connection created
[31070] 2019/09/10 13:34:40.426582 [TRC] 127.0.0.1:53203 - cid:6 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.7.0","protocol":1,"echo":true}]
[31070] 2019/09/10 13:34:40.426614 [TRC] 127.0.0.1:53203 - cid:6 - <<- [PING]
[31070] 2019/09/10 13:34:40.426625 [TRC] 127.0.0.1:53203 - cid:6 - ->> [PONG]
[31070] 2019/09/10 13:34:40.426804 [TRC] 127.0.0.1:53203 - cid:6 - <<- [SUB k8s_events 1]
[31070] 2019/09/10 13:34:40.426821 [TRC] 127.0.0.1:53203 - cid:6 - <<- [PING]
[31070] 2019/09/10 13:34:40.426827 [TRC] 127.0.0.1:53203 - cid:6 - ->> [PONG]
[31070] 2019/09/10 13:34:44.167844 [DBG] ::1:53206 - cid:7 - Client connection created
[31070] 2019/09/10 13:34:44.168352 [TRC] ::1:53206 - cid:7 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Publisher","lang":"go","version":"1.7.2","protocol":1,"echo":true}]
[31070] 2019/09/10 13:34:44.168383 [TRC] ::1:53206 - cid:7 - <<- [PING]
[31070] 2019/09/10 13:34:44.168390 [TRC] ::1:53206 - cid:7 - ->> [PONG]
[31070] 2019/09/10 13:34:44.168594 [TRC] ::1:53206 - cid:7 - <<- [PUB k8s_events 11]
[31070] 2019/09/10 13:34:44.168607 [TRC] ::1:53206 - cid:7 - <<- MSG_PAYLOAD: ["{json data}"]
[31070] 2019/09/10 13:34:44.168623 [TRC] 127.0.0.1:53203 - cid:6 - ->> [MSG k8s_events 1 11]
[31070] 2019/09/10 13:34:44.168648 [TRC] ::1:53206 - cid:7 - <<- [PING]
[31070] 2019/09/10 13:34:44.168653 [TRC] ::1:53206 - cid:7 - ->> [PONG]
Using the connection ID, you can see that connection ID 7 published 11 bytes to k8s_events (the protocol message PUB k8s_events 11 with the message payload following it), and connection ID 6 (the subscriber) received the message (MSG k8s_events 1 11).
This is one way you can check that your client is publishing the message and your subscribers are listening to the correct subject.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With