I need a fast way to send 300 short messages a second over zeromq
between the python multiprocessing processes. Each message needs to contain an ID
and time.time()
msgpack
seems like the best way to serialize the dict before sending it via zeromq
, and conveniently, msgpack
has an example of exactly what I need, except it has a datetime.datetime.now()
.
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
The problem is that their example doesn't work, I get this error:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'
Maybe because I'm on python 3.4, but I don't know what the issue with strptime. Would appreciate your help.
Given that messagepack ( import msgpack
) is good at serializing integers, I created a solution which only uses integers:
_datetime_ExtType = 42
def _unpacker_hook(code, data):
if code == _datetime_ExtType:
values = unpack(data)
if len(values) == 8: # we have timezone
return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
else:
return datetime.datetime(*values)
return msgpack.ExtType(code, data)
# This will only get called for unknown types
def _packer_unknown_handler(obj):
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
else:
components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)
# we effectively double pack the values to "compress" them
data = msgpack.ExtType(_datetime_ExtType, pack(components))
return data
raise TypeError("Unknown type: {}".format(obj))
def pack(obj, **kwargs):
# we don't use a global packer because it wouldn't be re-entrant safe
return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)
def unpack(payload):
try:
# we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
gc.disable()
# This must match the above _packer parameters above. NOTE: use_list is faster
return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
finally:
gc.enable()
Python3 and Python2 manage differently strings encoding : encoding-and-decoding-strings-in-python-3-x
Then it is needed to :
b'as_str'
(instead of 'as_str'
) as dictionary keyencode
and decode
for the stored valueModifying the code like this works with python2 and python3 :
import datetime
import msgpack
useful_dict = {
"id": 1,
"created": datetime.datetime.now(),
}
def decode_datetime(obj):
if b'__datetime__' in obj:
obj = datetime.datetime.strptime(obj[b'as_str'].decode(), "%Y%m%dT%H:%M:%S.%f")
return obj
def encode_datetime(obj):
if isinstance(obj, datetime.datetime):
obj = {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f").encode()}
return obj
packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With