Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

HTTP server in ZMQ or How to handle a POST request with pyzmq?

I'm trying to create an HTTP server with ZMQ_STREAM socket.

When I do a simple POST request:

POST  HTTP/1.1
Host: localhost:5555
Cache-Control: no-cache
Postman-Token: 67004be5-56bc-c1a9-847a-7db3195c301d

Apples to Oranges!

Here is how I handle this with pyzmq:

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")

while True:
    # Get HTTP request
    parts = []
    id_, msg = socket.recv_multipart()  # [id, ''] or [id, http request]
    parts.append(id_)
    parts.append(msg)
    if not msg:
        # This is a new connection - this is just the identify frame (throw away id_)
        # The body will come next
        id_, msg = socket.recv_multipart() # [id, http request]
        parts.append(id_)
        parts.append(msg)

        end = socket.recv_multipart() # [id*, ''] <- some kind of junk? 
        parts.append(end)

    print("%s" % repr(parts))

So that parts list comes out to be:

['\x00\x80\x00\x00)', '', '\x00\x80\x00\x00)', 'POST / HTTP/1.1\r\nHost: localhost:5555\r\nConnection: keep-alive\r\nContent-Length: 18\r\nCache-Control: no-cache\r\nOrigin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop\r\nContent-Type: text/plain;charset=UTF-8\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36\r\nPostman-Token: 9503fce9-8b1c-b39c-fb4d-3a7f21b509de\r\nAccept: */*\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: en-US,en;q=0.8,ru;q=0.6,uk;q=0.4\r\n\r\nApples to Oranges!', ['\x00\x80\x00\x00*', '']]

So I understand that:

  1. '\x00\x80\x00\x00)', '' is the identity of the connection. This is set initially by ZMQ_STREAM socket. On subsequent requests it seems to be absent.
  2. \x00\x80\x00\x00) is the identity again, this is what we see on subsequent requests from the client from ZMQ_STREAM socket.
  3. Then the actual HTTP request

But the last pair of magic numbers: ['\x00\x80\x00\x00*', '']

What the heck does that stand for?

References:

  1. http://api.zeromq.org/4-0:zmq-socket
  2. HTTP 1.1 Spec: http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
like image 602
Andriy Drozdyuk Avatar asked Oct 14 '15 00:10

Andriy Drozdyuk


People also ask

What is ZMQ used for?

ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is an asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker; the zero in the name is for zero broker.

How do I know if my ZMQ socket is connected?

No, there's no method in the API to check if a socket is connected. ZeroMq abstracts the network; client and server connections are completely transparent to the peer making the connection.

Is ZMQ asynchronous?

ZeroMQ is an asynchronous network messaging library known for its high performance.

What is PyZMQ in Python?

PyZMQ is the Python bindings for ØMQ. This documentation currently contains notes on some important aspects of developing PyZMQ and an overview of what the ØMQ API looks like in Python. For information on how to use ØMQ in general, see the many examples in the excellent ØMQ Guide, all of which have a version in Python.


1 Answers

But the last pair of magic numbers: ['\x00\x80\x00\x00*', ''] What the heck does that stand for?

That's a new connection, with a new connection ID. The connection id is an integer counter, and you can see using the Python builtin ord to see that ord(')') = 41 and ord('*') = 42, which is the next number in sequence.

Writing an HTTP server with ZMQ_STREAM, you have to be careful because it's more complicated than just receiving one message after the connection is established. The issue is primarily that you aren't guaranteed that a request will be complete; the body could arrive in chunks in potentially several messages. You are going to have to read HTTP headers and handle receiving the body in pieces.

Here is an example that handles POST requests coming from curl:

from traceback import print_exc
import zmq
from tornado.httputil import HTTPHeaders

class BadRequest(Exception):
    pass

class ConnectionLost(Exception):
    pass

def parse_request(request):
    """Parse a request verp, path, and headers"""
    first_line, header_lines = request.split(b'\r\n', 1)
    verb, path, proto = first_line.decode('utf8').split()
    headers = HTTPHeaders.parse(header_lines.decode('utf8', 'replace'))
    return verb, path, headers


def recv_body(socket, headers, chunks, request_id):
    """Receive the body of a request"""
    if headers.get('expect', '').lower() == '100-continue':
        if 'Content-Length' not in headers:
            # Don't support chunked transfer: http://tools.ietf.org/html/rfc2616#section-3.6.1
            print("Only support specified-length requests")
            socket.send_multipart([
                request_id, b'HTTP/1.1 400 (Bad Request)\r\n\r\n',
                request_id, b'',
            ])
            msg = 1
            while msg != b'':
                # flush until new connection
                _, msg = socket.recv_multipart()
            raise BadRequest("Only support specified-length requests")

        socket.send_multipart([request_id, b'HTTP/1.1 100 (Continue)\r\n\r\n'], zmq.SNDMORE)

        content_length = int(headers['Content-Length'])
        print("Waiting to receive %ikB body" )
        while sum(len(chunk) for chunk in chunks) < content_length:
            id_, msg = socket.recv_multipart()
            if msg == b'':
                raise ConnectionLost("Disconnected")
            if id_ != request_id:
                raise ConnectionLost("Received data from wrong ID: %s != %s" % (id_, request_id))
            chunks.append(msg)
    return b''.join(chunks)


print(zmq.__version__, zmq.zmq_version())


socket = zmq.Context().socket(zmq.STREAM)
socket.bind("tcp://*:5555")


while True:
    # Get HTTP request
    request_id, msg = socket.recv_multipart()
    if msg == b'':
        continue
    chunks = []
    try:
        request, first_chunk = msg.split(b'\r\n\r\n', 1)
        if first_chunk:
            chunks.append(first_chunk)
        verb, path, headers = parse_request(request)
        print(verb, path)
        print("Headers:")
        for key, value in headers.items():
            print('  %s: %s' % (key, value))
        body = recv_body(socket, headers, chunks, request_id)
        print("Body: %r" % body)
    except BadRequest as e:
        print("Bad Request: %s" % e)
    except ConnectionLost as e:
        print("Connection Lost: %s" % e)
    except Exception:
        print("Failed to handle request", msg)
        print_exc()
        socket.send_multipart([
            request_id, b'HTTP/1.1 500 (OK)\r\n\r\n',
            request_id, b''])
    else:
        socket.send_multipart([
            request_id, b'HTTP/1.1 200 (OK)\r\n\r\n',
            request_id, b''])

The relevant logic for this case is in the recv_body method, which reads the headers and continues to recv chunks of the body until done.

Frankly, I don't think it makes a lot of sense to write an HTTP server in Python using ZMQ_STREAM. You can integrate zmq sockets with existing Python eventloops and re-use already established HTTP libraries, so you don't have to deal with re-inventing this particular wheel. For instance, pyzmq plays especially nicely with the tornado eventloop, and you can use zmq sockets and tornado http handlers together in the same application.

like image 115
minrk Avatar answered Sep 19 '22 05:09

minrk