Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sniffer/monitor in client/server config with ZMQ in Python

I implemented a client/server via ZeroMQ and I would like to add a sniffer/monitor to capture the communication between the two.

              client <---------> server
              (REQ)       |         (REP)
                          | 
                          |
                          v
                        sniffer  <-this is what I want to add

If client/server communicates over socket 5555, per say, how can I add a sniffer to listen on the same socket? Is there a way to distinguish which message is from client and which from server? Can someone share the experience?

Edited according to Jan' answer

The configuration would become as:

client [REQ]----->[ROUTER:4444] monitor [DEALER]------->[REP:5555] server
                              [PUB:7777]
                                  ^
                                  |
                                  |
                                  |
                                  | 
                                [SUB] 
                            monitorclient(sniffer)  <-this is what I want to add

Arrows show direction of connection (heading to bound port).

Messages are flowing:

  • client -> monitor -> server -> monitor -> client
  • and also monitor -> monitorclient

There is a nicer picture here.

like image 678
flamenco Avatar asked May 27 '14 04:05

flamenco


1 Answers

For sniffing, we need some intermediate part.

zmq offers couple of options

  • write your own program, accepting request on one side, sending them out, getting response, sending to original requester, and reporting this traffic to you
  • use zmq.proxy - however, this requires latest version of libzmq (zmq.zmq_version_info() >= 3) which is currently not even available on my Ubuntu 14.04, so I skip this.
  • use MonitoredQueue - this is what you probably want. This provides a loop exchanging messages between frontend and backend, while publishing/pushing/sending them to another socket.

The plan

This solution is based on MonitoredQueue example from pyzmq doc

Server bound to port 5555

Server will be bound to port 5555. Unlike other examples, I will keep your server as fixed part and not change it connecting to the MontitoredQueue. However, such a swap is not a problem and will not make any problems (as long as you adjust MonitoredQueue properly).

MonitoredQueue bound to port 4444, connected to port 5555, publishing traffic on port 7777

MonitoredQueue sits in between client and server. It listens on port 4444, sends requests to server and responses back to the client. At the same time, any message passing by will be published with corresponding prefix "in" or "out" on PUB socket. We will later see, these will contain not only prefix and request/response, but also identity of client.

Client connecting to port 4444

Client could connect directly to the server on port 5555, but this would not allow us to sniff the traffic. For this reason, we will connect client to port 4444, where is MonitoredQueue waiting to server and sniff.

You shall see, that the client and server will not have to change a line of code to participate in this exchange.

Real code

server.py

In our case, the server expects a string which can be converted to an integer and returns back a string with doubled value.

import zmq

def double_server(server_url="tcp://*:5555"):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(server_url)
    print "server started..."
    while True:
        req = socket.recv()
        print "server received request", req
        result = str(2*int(req))
        socket.send(result)
        print "server replied with", result

if __name__ == "__main__":
    double_server()

client.py

Our client will try 5 times asking for some result on port 4444 on localhost.

import zmq

def client(server_url="tcp://localhost:4444"):
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    # socket.setsockopt(zmq.IDENTITY, "client_id_abc") # see Conclusions
    socket.connect(server_url)

    for i in range(5):
        print "request", i
        socket.send(str(i))
        res = socket.recv()
        print i, "result: ", res

if __name__ == "__main__":
    client()

You might try it now to connect directly to port 5555 to see it works, but for our sniffing it must talk to MonitoredQueue.

monitor.py

Here comes all the magic. pyzmq already provides device MonitoredQueue, so we may simply take it and use.

import zmq
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes

def monitoredqueue(frontend_url="tcp://*:4444", server_url="tcp://localhost:5555", capture_url="tcp://*:7777"):
    mondev = MonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB, asbytes("in"), asbytes("out"))
    mondev.bind_in(frontend_url)
    mondev.connect_out(server_url)
    mondev.bind_mon(capture_url)
    mondev.setsockopt_in(zmq.HWM, 1)
    mondev.start()
    print "monitored queue started"

if __name__ == "__main__":
    monitoredqueue()

Note about socket types and aliases:

  • zmq.ROUTER used to be called zmq.XREP
  • zmq.DEALER used to be called zmq.XREQ
  • these aliases are still working.

The MonitoredQueue will publish each message passing by on zmq.PUB socket on port 7777. These messages will be prefixed by "in" and "out" and will also contain one frame with identity string. This identity string is assigned by ROUTER socket and during the exchange it is unique for all connected clients. This identity is part of so called envelope and is from request / reply message delimited by empty frame (as will be seen soon).

monitorclient.py

This monitor client is here just to show, how to reach sniffed information.

It subscribes to port 7777, served by monitor (MonitoredQueue) and prints it out. It is important to consume multipart message, otherwise we would miss some information.

import zmq

def monitorclient(server_url="tcp://localhost:7777"):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect(server_url)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    print "started monitoring client"

    while True:
        res = socket.recv_multipart()
        print res

if __name__ == "__main__":
    monitorclient()

Run it

We will need 4 consoles open, in each we will start one python script

Start the server first:

$ python server.py

Start MonitoredQueue

$ python monitor.py

Start client, reading sniffed messages

$ python monitorclient.py

Finally, start the client trying to get some response from the server proxied by MonitoredQueue

$ python client.py
request 0
0 result:  0
request 1
1 result:  2
request 2
2 result:  4
request 3
3 result:  6
request 4
4 result:  8

Results are as expected.

Now check the server.py output:

$ python server.py
server received request 0
server replied with 0
server received request 1
server replied with 2
server received request 2
server replied with 4
server received request 3
server replied with 6
server received request 4
server replied with 8

No surprise, all goes well.

Our monitor.py does not print out anything, we will have to check output from monitorclient.py

$ python monitorclient.py 
started monitoring client
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '1']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '3']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '6']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '8']

Here you see printout of all 10 messages, 5 requests, 5 responses.

Each is having a structure [prefix, identity, emptyframe, message] where

  • prefix is either "in" or "out"
  • identity is a string assigned to particular client by MonitoredQueues. Each time the client connects, this identity might change. As bonus, we might connect multiple clients and still have a chance to distinguish between different clients. If you need specific client identities, see commented line in client.py with socket.setsockopt(zmq.IDENTITY, "client_id_abc"). If you uncomment it, you will see "client_id_abc" as identity of your client.
  • emptyframe is seen as '' and is delimiting envelop from message data.
  • message is what client asked or what server replied.

Conclusions

  • sniffing works, and PyZMQ already offers device MonitoredQueue for this purpose
  • with zmq.PUB the sniffing will not block any communication, you may simply ignored sniffed data and all will work.
  • for production, it would be practical making MonitoredQueue fixed part of the system, thus being bound to known IP address and port. This would require a change on server, which would have to connect (instead of current binding). Such a change is trivial and does not affect rest of the code and behaviour. If you have only one endpoint to monitor, you could also embed the monitor into server (this would require 2 threads, one for server, another for monitor).
  • zmq is great "Lego" for this sort of tasks.
like image 137
Jan Vlcinsky Avatar answered Nov 06 '22 05:11

Jan Vlcinsky