Closed aforren1 closed 5 years ago
A few notes:
Script:
import timeit
setup = """
import numpy as np
import msgpack
import msgpack_numpy as m
import pickle as pkl
m.patch()
msg = np.random.random((20, 30, 5))
def ser(msg):
return msgpack.packb(msg, use_bin_type=True)
def deser(msg):
return msgpack.unpackb(msg, encoding='utf-8')
def pickl(msg):
return pkl.dumps(msg)
def unpickl(msg):
return pkl.loads(msg)
"""
num = 10000
timeit.timeit(stmt='deser(ser(msg))', setup=setup, number=num)/num
timeit.timeit(stmt='unpickl(pickl(msg))', setup=setup, number=num)/num
It should be fairly straightforward to parameterize the options, though (e.g. a) serialization method, b) pipeline, ...)
It scales with the amount of data, but seems manageable at relatively low data sizes ("typical" data sizes/rates result in a read taking ~100us).
import timeit
setup = """
import numpy as np
from io import BytesIO
import msgpack
import msgpack_numpy as m
m.patch()
msg = np.random.random((20)) # "typical" data size
def ser(msg):
return msgpack.packb(msg, use_bin_type=True)
def write_to_io(x, y):
x.write(ser(y))
buf = BytesIO()
for i in range(17): # number of samples in buffer (in this example, num of samples during one frame)
write_to_io(buf, msg)
def test(x):
x.seek(0)
unpacker = msgpack.Unpacker(x)
return [i for i in unpacker]
"""
num = 1000
timeit.timeit(stmt='test(buf)', setup=setup, number=num)/num
It's straightforward to try the system pipe again, e.g.
from multiprocessing import Pipe
a, b = Pipe()
msg = ser(np.random.random((20)))
a.send_bytes(msg)
msg2 = b.recv_bytes(msg)
unpacker = msgpack.Unpacker()
unpacker.feed(msg2)
[i for i in unpacker]
(Incomplete thoughts) I like the idea of having message-oriented inter-process communication, which might be easier to understand compared to shared memory.
See msgpack + msgpack-numpy, mmap (?), ...
Two things to decide: