Open cboulay opened 2 years ago
The previously first idea is a dud. send_buffer::push_sample
is a p_sample
copy, so a pointer, not the actual data. This is cheap and not worth optimizing. I've removed it.
send_buffer_size merged in #145
I understand the over-the-wire protocol a little better now. We can't push a chunk of data through directly unless it's been a priori formatted as 1|2 (ts) ch0 ch1 ... chx 1|2 (ts) ch0 ch1 ... chx
etc, where 1|2
is a uint8_t
flag to indicate whether or not a timestamp follows, ts
is an optional double
timestamp depending on the flag, and chx
are sample values of any type.
So if the data provider encodes their data that way then it can push all the way to asio with no copies. This also assumes endianness compatibility between sender and receiver!
It's probably pretty rare that the data comes off the device formatted that way (with flags and timestamps interleaved in each sample), so instead we can use a gather-write vector of buffers. Thank you @tstenner for cluing me into this.
std::vector<const_buffer> bufs;
for (samp_idx...) {
bufs.push_back(boost::asio::buffer(&flags[TAG_XXX_TIMESTAMP], 1));
if (...) bufs.push_back(boost::asio::buffer(&ts[samp_idx], sizeof(double)));
bufs.push_back(boost::asio::buffer(&data[samp_idx], sizeof(data[0]) * nchan));
}
for (consumer...) {
bytes_transferred = consumer_->socket.send(bufs);
}
I just need access to the socket...
I figured out a way to do the blocking synchronous write with zero copies.
With 2,000 channels at 30,000 samples per second, CPU utilization on the outlet process decreases from 18.3% to 5% on my desktop (i5-8400). Pretty good!
Unfortunately I can't test what this means in practical terms of "what does this now enable us to do that we couldn't before?" because I'm bottle-necked either by the network or CPU usage of the receiver application. I guess that's a good problem to have. After I clean this up, I think the next step will be to work on optimizing the inlet.
@tstenner provided me with a one-liner that could mimic an inlet with almost no overhead for testing purposes. This works well in the new synchronous-write mode. But for some reason in the original asynchronous mode, the outlet process consumes much more CPU using this one-liner than it would if I were using a normal inlet application. (For reference, it's spending most of its time in lsl::sample::serialize_channels
). Unfortunately I can't make any conclusions about gains here because I don't know what the limits are in asynchronous mode.
One-liner for posterity:
nc -t localhost 16573 < handshake.dmp > /dev/null
Where handshake.dmp is a file with the following contents:
LSL:streamfeed/110 ce67fa29-595e-43e2-adce-b10283840b4c
Native-Byte-Order: 1234
Endian-Performance: 6.14477e+06
Has-IEEE754-Floats: 1
Supports-Subnormals: 0
Value-Size: 8
Data-Protocol-Version: 110
Max-Buffer-Length: 300
Max-Chunk-Length: 0
Hostname: chad-ubuntu-18
Source-Id: example-SendDataInChunks
Session-Id: default
The long hex-code in the first line has to be replaced with the stream UID which can be obtained from the outlet.info().uid()
. Source-Id and Hostname should also be changed.
But for some reason in the original asynchronous mode, the outlet process consumes much more CPU using this one-liner than it would if I were using a normal inlet application.
That's because the Value-Size
doesn't match the stream's size (2 in your case) so the data protocol gets downgraded to 100 silently. This can be circumvented with a small script out of the you-shouldn't-do-that-unless-you-have-to-department:
import pylsl
# Generate a handshake packet to stresstest or debug an outlet
# Usage: Generate_Handshake.py my_stream_name | nc -t $outlet_hostname $ip -W 1
# $ip is usually 16573
# omit -W 1 to receive the stream data
# add > /dev/null to dump the received data immediately
def lsl_handshake(streamname, endian=1234, version=110, maxbuflen=300, maxchunklen=0):
stream = pylsl.resolve_byprop('name', streamname, 1, 5)[0]
value_sizes = [0, 4, 8, 32, 4, 2, 1, 8]
fields = {
'Native-Byte-Order': str(endian),
'Endian-Performance': '0',
'Has-IEEE754-Floats': '1',
'Supports-Subnormals': '0',
'Value-Size': str(value_sizes[stream.channel_format()]),
'Data-Protocol-Version': str(version),
'Max-Buffer-Length': str(maxbuflen),
'Max-Chunk-Length': str(maxchunklen),
'Hostname': stream.hostname(),
'Session-Id': 'default'
}
print('LSL:streamfeed/110 ' + stream.uid())
for key, value in fields.items():
print(key, value, sep=': ')
print('\r\n\r\n')
if __name__ == '__main__':
import sys
lsl_handshake('1' if len(sys.argv) < 2 else sys.argv[1])
For the first run, you should check with the -W 1
option that the parameters are interpreted correctly.
push_chunk
-- memcpy entire chunk instead of enqueue sample-by-sampleI'm guessing we would need transform the
sample
class so thedata_
member was a pointer to an address within a larger chunk-size buffer somewhere, then adjacent (in time) samples' data would occupy contiguous space in memory.sample::save_streambuf
(inclient_session::transfer_samples_thread
) appears to be compatible with this, and we could get even more benefit if we augmentsave_streambuf
with an argument forn_samples = 1
andsave_raw
's third argument will bedatasize() * n_samples
.This actually looks a little more plausible than I originally thought. It's still a pretty big endeavour.
boost::asio::socket_base::send_buffer_size
-- configurable via lsl_api.cfgThis isn't really an optimization so much as a parameter to allow very high throughput outlets to not choke on bursty inlets (whether due to network or the inlet process).
Thoughts? @tstenner @chkothe