I'm trying to create an HTTP server with ZMQ_STREAM
socket.
When I do a simple POST
request:
POST HTTP/1.1
Host: localhost:5555
Cache-Control: no-cache
Postman-Token: 67004be5-56bc-c1a9-847a-7db3195c301d
Apples to Oranges!
Here is how I handle this with pyzmq:
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")
while True:
# Get HTTP request
parts = []
id_, msg = socket.recv_multipart() # [id, ''] or [id, http request]
parts.append(id_)
parts.append(msg)
if not msg:
# This is a new connection - this is just the identify frame (throw away id_)
# The body will come next
id_, msg = socket.recv_multipart() # [id, http request]
parts.append(id_)
parts.append(msg)
end = socket.recv_multipart() # [id*, ''] <- some kind of junk?
parts.append(end)
print("%s" % repr(parts))
So that parts
list comes out to be:
['\x00\x80\x00\x00)', '', '\x00\x80\x00\x00)', 'POST / HTTP/1.1\r\nHost: localhost:5555\r\nConnection: keep-alive\r\nContent-Length: 18\r\nCache-Control: no-cache\r\nOrigin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop\r\nContent-Type: text/plain;charset=UTF-8\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36\r\nPostman-Token: 9503fce9-8b1c-b39c-fb4d-3a7f21b509de\r\nAccept: */*\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: en-US,en;q=0.8,ru;q=0.6,uk;q=0.4\r\n\r\nApples to Oranges!', ['\x00\x80\x00\x00*', '']]
So I understand that:
'\x00\x80\x00\x00)', ''
is the identity of the connection. This is set initially by ZMQ_STREAM
socket. On subsequent requests it seems to be absent.\x00\x80\x00\x00)
is the identity again, this is what we see on subsequent requests from the client from ZMQ_STREAM
socket.But the last pair of magic numbers: ['\x00\x80\x00\x00*', '']
What the heck does that stand for?
References:
ZeroMQ (also spelled ØMQ, 0MQ or ZMQ) is an asynchronous messaging library, aimed at use in distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ZeroMQ system can run without a dedicated message broker; the zero in the name is for zero broker.
No, there's no method in the API to check if a socket is connected. ZeroMq abstracts the network; client and server connections are completely transparent to the peer making the connection.
ZeroMQ is an asynchronous network messaging library known for its high performance.
PyZMQ is the Python bindings for ØMQ. This documentation currently contains notes on some important aspects of developing PyZMQ and an overview of what the ØMQ API looks like in Python. For information on how to use ØMQ in general, see the many examples in the excellent ØMQ Guide, all of which have a version in Python.
But the last pair of magic numbers: ['\x00\x80\x00\x00*', ''] What the heck does that stand for?
That's a new connection, with a new connection ID. The connection id is an integer counter, and you can see using the Python builtin ord
to see that ord(')') = 41
and ord('*') = 42
, which is the next number in sequence.
Writing an HTTP server with ZMQ_STREAM, you have to be careful because it's more complicated than just receiving one message after the connection is established. The issue is primarily that you aren't guaranteed that a request will be complete; the body could arrive in chunks in potentially several messages. You are going to have to read HTTP headers and handle receiving the body in pieces.
Here is an example that handles POST requests coming from curl:
from traceback import print_exc
import zmq
from tornado.httputil import HTTPHeaders
class BadRequest(Exception):
pass
class ConnectionLost(Exception):
pass
def parse_request(request):
"""Parse a request verp, path, and headers"""
first_line, header_lines = request.split(b'\r\n', 1)
verb, path, proto = first_line.decode('utf8').split()
headers = HTTPHeaders.parse(header_lines.decode('utf8', 'replace'))
return verb, path, headers
def recv_body(socket, headers, chunks, request_id):
"""Receive the body of a request"""
if headers.get('expect', '').lower() == '100-continue':
if 'Content-Length' not in headers:
# Don't support chunked transfer: http://tools.ietf.org/html/rfc2616#section-3.6.1
print("Only support specified-length requests")
socket.send_multipart([
request_id, b'HTTP/1.1 400 (Bad Request)\r\n\r\n',
request_id, b'',
])
msg = 1
while msg != b'':
# flush until new connection
_, msg = socket.recv_multipart()
raise BadRequest("Only support specified-length requests")
socket.send_multipart([request_id, b'HTTP/1.1 100 (Continue)\r\n\r\n'], zmq.SNDMORE)
content_length = int(headers['Content-Length'])
print("Waiting to receive %ikB body" )
while sum(len(chunk) for chunk in chunks) < content_length:
id_, msg = socket.recv_multipart()
if msg == b'':
raise ConnectionLost("Disconnected")
if id_ != request_id:
raise ConnectionLost("Received data from wrong ID: %s != %s" % (id_, request_id))
chunks.append(msg)
return b''.join(chunks)
print(zmq.__version__, zmq.zmq_version())
socket = zmq.Context().socket(zmq.STREAM)
socket.bind("tcp://*:5555")
while True:
# Get HTTP request
request_id, msg = socket.recv_multipart()
if msg == b'':
continue
chunks = []
try:
request, first_chunk = msg.split(b'\r\n\r\n', 1)
if first_chunk:
chunks.append(first_chunk)
verb, path, headers = parse_request(request)
print(verb, path)
print("Headers:")
for key, value in headers.items():
print(' %s: %s' % (key, value))
body = recv_body(socket, headers, chunks, request_id)
print("Body: %r" % body)
except BadRequest as e:
print("Bad Request: %s" % e)
except ConnectionLost as e:
print("Connection Lost: %s" % e)
except Exception:
print("Failed to handle request", msg)
print_exc()
socket.send_multipart([
request_id, b'HTTP/1.1 500 (OK)\r\n\r\n',
request_id, b''])
else:
socket.send_multipart([
request_id, b'HTTP/1.1 200 (OK)\r\n\r\n',
request_id, b''])
The relevant logic for this case is in the recv_body
method, which reads the headers and continues to recv chunks of the body until done.
Frankly, I don't think it makes a lot of sense to write an HTTP server in Python using ZMQ_STREAM. You can integrate zmq sockets with existing Python eventloops and re-use already established HTTP libraries, so you don't have to deal with re-inventing this particular wheel. For instance, pyzmq plays especially nicely with the tornado eventloop, and you can use zmq sockets and tornado http handlers together in the same application.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With