Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Debugging NATS messages from Kubernetes event

I have a simple script which watches Kubernetes events and then publishes a message to a NATS server:

#!/usr/bin/env python
import asyncio
import argparse
import json
import logging
import os

from kubernetes import client, config, watch

from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

# monkey patch
from kube import local_load_oid_token
config.kube_config.KubeConfigLoader._load_oid_token = local_load_oid_token

parser = argparse.ArgumentParser()
parser.add_argument('--in-cluster', help="use in cluster kubernetes config", action="store_true")
parser.add_argument('-a', '--nats-address', help="address of nats cluster", default=os.environ.get('NATS_ADDRESS', None))
parser.add_argument('-d', '--debug', help="enable debug logging", action="store_true")
parser.add_argument('-p', '--publish-events', help="publish events to NATS", action="store_true")
parser.add_argument('--output-events', help="output all events to stdout", action="store_true", dest='enable_output')
parser.add_argument('--connect-timeout', help="NATS connect timeout (s)", type=int, default=10, dest='conn_timeout')
parser.add_argument('--max-reconnect-attempts', help="number of times to attempt reconnect", type=int, default=1, dest='conn_attempts')
parser.add_argument('--reconnect-time-wait', help="how long to wait between reconnect attempts", type=int, default=10, dest='conn_wait')
args = parser.parse_args()

logger = logging.getLogger('script')
ch = logging.StreamHandler()
if args.debug:
    logger.setLevel(logging.DEBUG)
    ch.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)
    ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)

if not args.nats_address:
    logger.critical("No NATS cluster specified")
    exit(parser.print_usage())
else:
    logger.debug("Using nats address: %s", args.nats_address)

if args.in_cluster:
    config.load_incluster_config()
else:
    try:
        config.load_kube_config()
    except Exception as e:
        logger.critical("Error creating Kubernetes configuration: %s", e)
        exit(2)

v1 = client.CoreV1Api()


async def run(loop):
    nc = NATS()
    try:
        await nc.connect(args.nats_address, loop=loop, connect_timeout=args.conn_timeout, max_reconnect_attempts=args.conn_attempts, reconnect_time_wait=args.conn_wait)
        logger.info("Connected to NATS at %s..." % (nc.connected_url.netloc))
    except Exception as e:
        exit(e)

    #print("Connected to NATS at {}...".format(nc.connected_url.netloc))

    async def get_node_events():
        w = watch.Watch()
        for event in w.stream(v1.list_node):
            accepted = ["DELETED"]
            if event['type'] in accepted:
                logger.info("Event: %s %s %s" % (event['type'], event['object'].kind, event['object'].metadata.name))
                msg = {'type':event['type'],'object':event['raw_object']}
                logger.debug("Raw Message: %s"  % msg)
                await nc.publish("k8s_events", json.dumps(msg).encode('utf-8'))
                if args.enable_output:
                    print(json.dumps(msg))

    await get_node_events()
    await nc.flush(timeout=3)
    await nc.close()




if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.create_task(run(loop))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info('keyboard shutdown')
        tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
        tasks.add_done_callback(lambda t: loop.stop())
        tasks.cancel()

        # Keep the event loop running until it is either destroyed or all
        # tasks have really terminated
        while not tasks.done() and not loop.is_closed():
            loop.run_forever()
    finally:
        logger.info('closing event loop')
        loop.close()

When running this with the event publishing enabled, I can see the event JSON being output.

However, for some reason my receiver isn't actually getting a NATS message for the deletion event.

  • How can I debug the message made it onto the topic? Is there anything I can add which validates the message made it onto the topic via the code?
  • Is my asyncio logic correct here?
  • Why might the deletion event not make it onto the topic with this logic?
like image 724
jaxxstorm Avatar asked Oct 22 '25 03:10

jaxxstorm


1 Answers

You can validate message delivery through the NATS server logs. In configuring the NATS servers, temporarily enable debug/trace with the -DV flag passed to the NATS server or in the NATS configuration file with:

debug=true
trace=true 

You should see something like this:

[31070] 2019/09/10 13:34:40.426198 [DBG] 127.0.0.1:53203 - cid:6 - Client connection created
[31070] 2019/09/10 13:34:40.426582 [TRC] 127.0.0.1:53203 - cid:6 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.7.0","protocol":1,"echo":true}]
[31070] 2019/09/10 13:34:40.426614 [TRC] 127.0.0.1:53203 - cid:6 - <<- [PING]
[31070] 2019/09/10 13:34:40.426625 [TRC] 127.0.0.1:53203 - cid:6 - ->> [PONG]
[31070] 2019/09/10 13:34:40.426804 [TRC] 127.0.0.1:53203 - cid:6 - <<- [SUB k8s_events  1]
[31070] 2019/09/10 13:34:40.426821 [TRC] 127.0.0.1:53203 - cid:6 - <<- [PING]
[31070] 2019/09/10 13:34:40.426827 [TRC] 127.0.0.1:53203 - cid:6 - ->> [PONG]
[31070] 2019/09/10 13:34:44.167844 [DBG] ::1:53206 - cid:7 - Client connection created
[31070] 2019/09/10 13:34:44.168352 [TRC] ::1:53206 - cid:7 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Publisher","lang":"go","version":"1.7.2","protocol":1,"echo":true}]
[31070] 2019/09/10 13:34:44.168383 [TRC] ::1:53206 - cid:7 - <<- [PING]
[31070] 2019/09/10 13:34:44.168390 [TRC] ::1:53206 - cid:7 - ->> [PONG]
[31070] 2019/09/10 13:34:44.168594 [TRC] ::1:53206 - cid:7 - <<- [PUB k8s_events 11]
[31070] 2019/09/10 13:34:44.168607 [TRC] ::1:53206 - cid:7 - <<- MSG_PAYLOAD: ["{json data}"]
[31070] 2019/09/10 13:34:44.168623 [TRC] 127.0.0.1:53203 - cid:6 - ->> [MSG k8s_events 1 11]
[31070] 2019/09/10 13:34:44.168648 [TRC] ::1:53206 - cid:7 - <<- [PING]
[31070] 2019/09/10 13:34:44.168653 [TRC] ::1:53206 - cid:7 - ->> [PONG]

Using the connection ID, you can see that connection ID 7 published 11 bytes to k8s_events (the protocol message PUB k8s_events 11 with the message payload following it), and connection ID 6 (the subscriber) received the message (MSG k8s_events 1 11).

This is one way you can check that your client is publishing the message and your subscribers are listening to the correct subject.

like image 158
Colin Sullivan Avatar answered Oct 23 '25 17:10

Colin Sullivan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!