Closed lcnittl closed 5 years ago
One thing that occurs to me is that we can increase queue depths. Queue depths can be used to help deal with bursts of multiple messages, but won't help a sustained case where your outbound message through put is more than the subscriber can consume.
We could invent an option to permit backpressure to form at the socket as well, such that we do not permit the application to move forward unless we are able to deliver the message to all connected sockets. For well behaved applications that would reduce or eliminate message loss. It would however permit a slow subscriber (which might be an intentional bad actor) to affect the publisher's ability to send messages.
Right now, we have a non-tunable message queue depth of 16, meaning we can burst up to 16 messages per subscriber before we drop. I can imagine easily making that a tunable value to provide for better buffering of bursts.
I'd be willing to also contemplate a tunable (off-by-default) that applied backpressure up to the socket, slowing the publisher down if the sender cannot deliver data. We could also provide a timeout, so that we only keep trying send for a maximum period of time, limiting the damage that a slow subscriber can do. Implementing that would be a fair (though not herculean) amount of effort.
If this is something that folks wanted for a commercial project, then I'd be willing to make these changes as a work for hire. Otherwise we should file issues for them, and I'll consider them amongst the other work I have outstanding.
One thing that occurs to me is that we can increase queue depths. Queue depths can be used to help deal with bursts of multiple messages, but won't help a sustained case where your outbound message through put is more than the subscriber can consume.
I think this would be a nice feature. Is it possible that the default zmq
queue depth is bigger than the one of nng
?
Still, if one takes care of receiving faster (might be batchwise inside a loop that does other stuff inbetween) than sending, increased queue depth should be a possible cure for this problem. However, I also believe that superior to fixed queue depths are tunable ones, as they would allow the user to adjust this setting to the specific needs:
Right now, we have a non-tunable message queue depth of 16, meaning we can burst up to 16 messages per subscriber before we drop. I can imagine easily making that a tunable value to provide for better buffering of bursts.
If I understand this correctly, as of the current implementation it should be possible for PUB
to send up to 16 msgs at once, and then for SUB
to receive all of them? Any idea, why in the burst example the SUB
only receives the first msg when PUB
is given time to send each of the 4 msgs? Shouldn't it, as less than 16 are sent, be able to receive all of them?
We could invent an option to permit backpressure to form at the socket as well, such that we do not permit the application to move forward unless we are able to deliver the message to all connected sockets. For well behaved applications that would reduce or eliminate message loss. It would however permit a slow subscriber (which might be an intentional bad actor) to affect the publisher's ability to send messages.
Here I believe that, combined with a tunable queue depth, this backpressure could be quite beneficial. If possible, I would, however, still consider to drop messages if the queue is full, making the whole thing independent of the subscriber speeds. If possible from a programming point of view, I would suggest this
backpressure to only affect the PUB
if the msg was not delivered into the queue (not to the SUB
), to "guarantee" publishing. After all, if an application makes the subscriber harvest the queue slower than it is filled and messages are dropped due to overflow of the queue, this is a flaw in the design of this application. However, if the queue is not filled, because of missing backpressure (waiting the send to have queued the msg), I would consider this as flaw in the message system (My personal opinion).
I'd be willing to also contemplate a tunable (off-by-default) that applied backpressure up to the socket, slowing the publisher down if the sender cannot deliver data. We could also provide a timeout, so that we only keep trying send for a maximum period of time, limiting the damage that a slow subscriber can do. Implementing that would be a fair (though not herculean) amount of effort.
Would such an implementation in the end represent some kind of multi-cast REQ/REP
? I am not sure if this would not be an overkill for PUB/SUB
, for which is stated that it is 'best effort'. However, I believe that "guaranteed" publishing to the queue is very important.
Lot to think about here. There are actually potentially multiple queues we can engage for pub sub -- a socket wide queue, and a per pipe one . Having the extra queues (and depth) increases latency, which in some circumstances is actually worse than just dropping the message outright. This probably needs to be tunable, really.
I just looked a bit more closely at your code.
I think there is another bug here, which may relate to how nng returns context back to the caller on send. nng runs stuff in a background thread, so your send operation can take place asynchronously after you get context back from the caller (there is a send buffer).
The problem is that the message has almost certainly not arrived at the receiver by then. So your attempt to do a receive immediately is probably doomed. This isn't a lost message, its an incorrect assumption about strict timing. This isn't like a normal TCP connection, where delivery by one side necessarily should indicate that data has already arrived and been acked by the kernel at the receive side.
The tiny sleep you added injects a context switch, which probably lets the underlying threads run to have a chance to get the stuff where it needs to be.
The best way to measure message loss is to run the send and receive as separate threads, or separate programs. What I usually do is send an increasing counter, and then count gaps on the receive side. These can run as separate processes.
All the other stuff I said about queuing still applies, but it's also the case that your current measurement is completely flawed based on false assumptions of linkage that simply do not apply. Sorry.
Thanks for your reply.
The best way to measure message loss is to run the send and receive as separate threads, or separate programs. What I usually do is send an increasing counter, and then count gaps on the receive side. These can run as separate processes.
This is how I originally encountered message loss, looping send and receive in two independent processes. I – admittedly – just did not manage to reflect this correctly with my example codes. As the behavior was the same, namely the loss of messages, I assumed a proper representation.
I am quite certain, however, now that the originally encountered message loss is due to the fixed queue length, as my pub and my sub are only on a temporal mean having the same in- and output as the publisher is sending at a constant rate, but the subscriber is receiving in a fluctuating manner (receiving all that can be received, spending time on processing, and back to receiving). The usual number of messages received in one loop is close to 300, measured with zmq
.
I see the flaw in the design of my first code examples. I made small adaptions that allow checking for message loss or too early receive.
Removing the block=False
option allows full retrieval of the messages, as it has to wait until the background thread actually sent them.
import pynng
import zmq
import time
pub = pynng.Pub0(listen='tcp://*:5555')
sub = pynng.Sub0(dial='tcp://localhost:5555')
sub.subscribe(b'0')
time.sleep(1)
i_send = 0
i_recv = 0
try:
while True:
i_send += 1
pub.send(b'0')
msg = sub.recv()
i_recv += 1
print(f'pynng: Lost {i_send - i_recv} of {i_send} msgs ' + '({0:3.3f} % loss)'.format((i_send - i_recv) / i_send * 100) + ' [Exit with Ctrl+C]', end='\r')
if i_send >= 10**6:
break
except KeyboardInterrupt:
pass
finally:
sub.close()
pub.close()
time.sleep(1)
exit()
What is, however, still puzzling me is the second example. Here the break in the original example actually made things worse, and all but one message were lost.
I have an updated – also blocking – code example that shows the loss of a message (But I don't know how and why, just that the second receive
after a couple of messages – ranging from 40 to 4000 – does not receive and thus blocks):
import pynng
import zmq
import time
burstsize = 2
pub = pynng.Pub0(listen='tcp://*:5555')
sub = pynng.Sub0(dial='tcp://localhost:5555')
sub.subscribe(b'')
time.sleep(1)
i_send = 0
i_recv = 0
try:
while True:
msgs = []
for i in range(burstsize):
i_send += 1
pub.send(bytes(str(i), encoding='utf-8'))
for i in range(burstsize):
msgs += [sub.recv()]
i_recv += 1
print(f'pynng: Lost {i_send - i_recv} of {i_send} msgs ' + '({0:3.3f} % loss) '.format((i_send - i_recv) / i_send * 100) + f'Burst: Recvd {len(msgs)} of {burstsize} msgs ' + '({0:3.3f} % loss) '.format((burstsize - len(msgs)) / burstsize * 100) + '[Exit with Ctrl+C]', end='\r')
if i_send >= 10**6:
break
except KeyboardInterrupt:
pass
finally:
sub.close()
pub.close()
time.sleep(1)
print()
exit()
Blocking receive is also wrong, because if a message is dropped, you will block forever.
I'm sure (I've got some other tests of this that I've written) that messages get dropped if you publish too hard. I'm sure that we can adjust queueing strategy to help, and even invent a backpressure mechanism.
Having said all that; any design which pushes message out as hard as it can over pub, is problematic at best. PUB/SUB is lossy, and not meant to handle vast amounts of inbound traffic by design.
However, I've also done some basic work and am convinced that our handling of SUB is somehow inefficient, as I'm seeing far fewer messages delivered than I would have expected. I've yet to find sufficient time to properly diagnose this -- I'm sure there is a bottleneck somewhere.
Blocking receive is also wrong, because if a message is dropped, you will block forever.
I agree for a real application, however, shouldn't it demonstrate that no packages are dropped due to a too fast proceeding/calling of receive in the example code?
Still I wonder how the message in the second example gets dropped. After all, bursts of only 2 messages cannot be called "vast amounts" of inbound traffic, or do they? And I agree, the publisher pushes out 2 messages at once, but due to the blocking receive that follows, it should have time to relax (like in the first example). Shouldn't the queue be able to buffer these up to its queue depth?
Thanks anyways for investigating!
The problem is that even the second example does not block on receive. This means that it's possible for messages to be backed up in the middle plumbing somewhere, causing backpressure.
If you want to mimic this properly, you should have your receive iterate for the same number of times that the send did (burst size) and use blocking. This will let the plumbing drain complete fully between bursts.
If you want to mimic this properly, you should have your receive iterate for the same number of times that the send did (burst size) and use blocking. This will let the plumbing drain complete fully between bursts.
Isn't his what my example - in the comment, not the OP - is doing? It should send 2 messages and blockingly receive 2 messages.
Now I see, too hard to follow. :-) That example looks ok. I can't think of any normal reason for that to drop messages. I'll have to try it out.
Now I see, too hard to follow. :-)
Sorry, I edited the posts to give a better overview and thus enhance readability.
I see your python code is doing
msgs += [sub.recv()]
I'm not sure what the intended purpose here is... is this a list comprehension?
The other thing is that it seems if your burst size is 2, you're going to print a warning on receipt of every 1st message. It seems like you should move your print to outside of the 2nd for loop.
Given that, if sub is blocking, then it seems literally impossible for this to record message drops.
If your sub is not blocking (and forgive me, as I'm not familiar with the pynng semantics here, I work in C primarily), then you can (will?) lose messages. The issue is that even though we have "sent" a message in the PUB, the socket takes time to effect the delivery to the remote side. So you would need to inject a brief sleep after the send and before the receive if you wanted to be 100% of avoiding message drops.
Normally strict flow like this is not used -- as normally the sender and receiver are not part of a single main thread, so the fact that there is some asynchronous stuff happening under the covers doesn't make a difference. If you do have a requirement for this kind of flow control, then you need to make sure that your receive is a blocking receive.
msgs += [sub.recv()]
Adds the received message as new element to the list of messages that is emptied after running through both for
loops. I used this, together with the print of the message receipt (in the second for loop) allows to check at which message the drop happened (A counter would work equally).
Given that, if sub is blocking, then it seems literally impossible for this to record message drops.
I agree, but still it does. This is the output of several runs of the blocking burst example (https://github.com/nanomsg/nng/issues/788#issuecomment-440006059).
>python pynng_test_burst_block.py
pynng: Lost 1 of 58 msgs (1.724 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]]
>python pynng_test_burst_block.py
pynng: Lost 1 of 46 msgs (2.174 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]]
>python pynng_test_burst_block.py
pynng: Lost 1 of 472 msgs (0.212 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 516 msgs (0.194 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 62 msgs (1.613 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]]
>python pynng_test_burst_block.py
pynng: Lost 1 of 56 msgs (1.786 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]]
>python pynng_test_burst_block.py
pynng: Lost 1 of 470 msgs (0.213 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 8 msgs (12.500 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 494 msgs (0.202 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 500 msgs (0.200 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 186 msgs (0.538 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
>python pynng_test_burst_block.py
pynng: Lost 1 of 50 msgs (2.000 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]]
>python pynng_test_burst_block.py
pynng: Lost 1 of 68 msgs (1.471 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]]
>python pynng_test_burst_block.py
pynng: Lost 1 of 478 msgs (0.209 % loss) Burst: Recvd 1 of 2 msgs (50.000 % loss) [Exit with Ctrl+C]
It always gets stuck after a varying amount of message bursts that it received correctly. The getting stuck must mean that a message was dropped and the blocking receive holds to wait for that message. After permanent blockage, the task was killed to restart it.
If your sub is not blocking (and forgive me, as I'm not familiar with the pynng semantics here, I work in C primarily), then you can (will?) lose messages. The issue is that even though we have "sent" a message in the PUB, the socket takes time to effect the delivery to the remote side. So you would need to inject a brief sleep after the send and before the receive if you wanted to be 100% of avoiding message drops.
True, this is why the sleep
s in the first example (OP) - that were non-blocking - worked after adding the sleep. However, in the second burst example (https://github.com/nanomsg/nng/issues/788#issuecomment-440006059), the receive is blocking (default in pynng
).
Normally strict flow like this is not used -- as normally the sender and receiver are not part of a single main thread, so the fact that there is some asynchronous stuff happening under the covers doesn't make a difference. If you do have a requirement for this kind of flow control, then you need to make sure that your receive is a blocking receive.
The strict flow was just for the sake of the minimal examples, in the second examples blocking, to show the problem it has.
If you're able to do so, have a look at the subperf branch. It has a refactor of the subscriber code that may perform better.
The code has been integrated into master, and I'm finding vastly improved subscriber performance now. There is still more work to do here, but you should find far fewer message drops now.
I'm intending to close this absent response.
First of all, thanks for your efforts! Second, sorry for me not replying - i was quite tied with work the last couple of weeks. I will try to have a look at it in the next few days, however, I will have to figure out how to use this specific nng version with pynng.
Unfortunately, it's not very straightforward to build with a custom version of nng right now in pynng. It is possible though! There's an issue for this, but I have not worked on it yet: https://github.com/codypiersall/pynng/issues/10
The setup script for pynng checks that the library is present, and if it is present then it won't build. So as long as you build the nng library before building pynng, you can use a version other than the default. Something like the following should work:
cd ~/dev # or wherever
git clone https://github.com/codypiersall/pynng
cd pynng
git clone https://github.com/nanomsg/nng
cd nng && mkdir build && cd build && CFLAGS=-fPIC cmake .. && CFLAGS=-FPIC cmake --build .
cd ../..
pip install -e .
There's a decent chance that there's a typo in what I just typed out above, but the idea is right.
You could also clone pynng, then change the NNG_REVISION
variable to whatever you need. Then the steps would be something like this
cd ~/dev # or wherever
git clone https://github.com/codypiersall/pynng
cd pynng
vim setup.py # change the NNG_REVISION to "master" or whatever you need
pip install -e .
I prefer method 1. Eventually it would be nice to modify the setup script to point to an existing nng library, but whenever I briefly looked into doing it in a setuptools-friendly way it didn't seem straightforward. I may eventually just throw something hacky together with argparse.
Hmmm, I realized there's an easier way, since I already have a build_nng.sh
script (or build_nng.bat
script if you're on Windows). Something like this is slightly more straightforward:
cd ~/dev # or wherever
git clone https://github.com/codypiersall/pynng && cd pynng
./build_nng.sh master # if you need a specific revision, swap out "master" with the revision.
pip install -e .
Great! I will give it a try
@codypiersall
Initially went for Method 2, as the batch version of build_nng
from Method 3 requires another - first - argument "CMake compiler generator". There I got the needed value to run Method 3:
build_nng.bat "Visual Studio 14 2015 Win64" c036b3e4a365a966215e383c1130c66d96aa917b
the build seems to run ok, still, gives 3 warnings:
Build succeeded.
"C:\pynng\nng\build\ALL_BUILD.vcxproj" (default target) (1) ->
"C:\pynng\nng\build\tests\cplusplus_pair.vcxproj" (default target) (26) ->
(ClCompile target) ->
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\include\xlocale(341): warning C4530: C++ exception handler use
d, but unwind semantics are not enabled. Specify /EHsc [C:\pynng\nng\build\tests\cplusplus_pair.vcxproj]
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\include\exception(359): warning C4577: 'noexcept' used with no
exception handling mode specified; termination on exception is not guaranteed. Specify /EHsc [C:\pynng\nng\build\tests
\cplusplus_pair.vcxproj]
"C:\pynng\nng\build\ALL_BUILD.vcxproj" (default target) (1) ->
"C:\pynng\nng\build\tests\httpserver.vcxproj" (default target) (31) ->
C:\pynng\nng\tests\httpserver.c(135): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead,
use the ISO C and C++ conformant name: _strdup. See online help for details. [C:\pynng\nng\build\tests\httpserver.vcxpr
oj]
3 Warning(s)
0 Error(s)
So far so good, however, when running pip install it fails
C:\pynng>pip install -e .
Obtaining file:///C:/pynng
Requirement already satisfied: cffi in c:\python\python37\lib\site-packages (from pynng==0.4.0+dev) (1.12.3)
Requirement already satisfied: sniffio in c:\python\python37\lib\site-packages (from pynng==0.4.0+dev) (1.1.0)
Requirement already satisfied: pycparser in c:\python\python37\lib\site-packages (from cffi->pynng==0.4.0+dev) (2.19)
Installing collected packages: pynng
Running setup.py develop for pynng
ERROR: Complete output from command 'c:\python\python37\python.exe' -c 'import setuptools, tokenize;__file__='"'"'C:\\pynng\\setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' develop --no-deps:
ERROR: running develop
running egg_info
writing pynng.egg-info\PKG-INFO
writing dependency_links to pynng.egg-info\dependency_links.txt
writing requirements to pynng.egg-info\requires.txt
writing top-level names to pynng.egg-info\top_level.txt
reading manifest file 'pynng.egg-info\SOURCES.txt'
reading manifest template 'MANIFEST.in'
warning: no files found matching '8.txt'
writing manifest file 'pynng.egg-info\SOURCES.txt'
running build_ext
generating cffi module 'build\\temp.win-amd64-3.7\\Release\\pynng._nng.c'
creating build
creating build\temp.win-amd64-3.7
creating build\temp.win-amd64-3.7\Release
building 'pynng._nng' extension
creating build\temp.win-amd64-3.7\Release\build
creating build\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7
creating build\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7\Release
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\BIN\x86_amd64\cl.exe /c /nologo /Ox /W3 /GL /DNDEBUG /MD -Inng/include -Ic:\python\python37\include -Ic:\python\python37\include "-IC:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\INCLUDE" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.10240.0\ucrt" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.10240.0\shared" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.10240.0\um" "-IC:\Program Files (x86)\Windows Kits\10\include\10.0.10240.0\winrt" /Tcbuild\temp.win-amd64-3.7\Release\pynng._nng.c /Fobuild\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7\Release\pynng._nng.obj
pynng._nng.c
build\temp.win-amd64-3.7\Release\pynng._nng.c(9670): warning C4028: formal parameter 2 different from declaration
build\temp.win-amd64-3.7\Release\pynng._nng.c(9714): warning C4028: formal parameter 2 different from declaration
creating C:\pynng\build\lib.win-amd64-3.7
creating C:\pynng\build\lib.win-amd64-3.7\pynng
C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\BIN\x86_amd64\link.exe /nologo /INCREMENTAL:NO /LTCG /DLL /MANIFEST:EMBED,ID=2 /MANIFESTUAC:NO /LIBPATH:c:\python\python37\libs /LIBPATH:c:\python\python37\PCbuild\amd64 "/LIBPATH:C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\LIB\amd64" "/LIBPATH:C:\Program Files (x86)\Windows Kits\10\lib\10.0.10240.0\ucrt\x64" "/LIBPATH:C:\Program Files (x86)\Windows Kits\10\lib\10.0.10240.0\um\x64" Ws2_32.lib Advapi32.lib /EXPORT:PyInit__nng build\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7\Release\pynng._nng.obj ./nng/build/Release/nng.lib /OUT:build\lib.win-amd64-3.7\pynng\_nng.cp37-win_amd64.pyd /IMPLIB:build\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7\Release\_nng.cp37-win_amd64.lib
pynng._nng.obj : warning LNK4197: export 'PyInit__nng' specified multiple times; using first specification
Creating library build\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7\Release\_nng.cp37-win_amd64.lib and object build\temp.win-amd64-3.7\Release\build\temp.win-amd64-3.7\Release\_nng.cp37-win_amd64.exp
pynng._nng.obj : error LNK2001: unresolved external symbol nng_listener_setopt_uint64
pynng._nng.obj : error LNK2001: unresolved external symbol nng_dialer_setopt_uint64
build\lib.win-amd64-3.7\pynng\_nng.cp37-win_amd64.pyd : fatal error LNK1120: 2 unresolved externals
error: command 'C:\\Program Files (x86)\\Microsoft Visual Studio 14.0\\VC\\BIN\\x86_amd64\\link.exe' failed with exit status 1120
----------------------------------------
ERROR: Command "'c:\python\python37\python.exe' -c 'import setuptools, tokenize;__file__='"'"'C:\\pynng\\setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' develop --no-deps" failed with error code 1 in C:\pynng\
Any ideas? Is this pynng related, or is there a flaw in my setup? Sry for the pynng issue within nng.
@lcnittl It's not just your setup; I've confirmed that I am seeing it too. I'm guessing the breaking change from nng came in c3e0626613... there's some seriously nice magic happening in the macros there. What I have not yet figured out is why seemingly only nng_listener_setopt_uint64
and nng_dialer_setopt_uint64
end up being undefined.
Okay, I have confirmed that after running nng.c through the c preprocessor, the functions nng_dialer_setopt_uint64
and nng_listener_setopt_uint64
are indeed undefined. (Okay, we already knew this from the error message I guess.) I'm not sure why it's undefined. As near as I can tell the exact same thing should be happening with all the stuff that is getting NNI_DEFTYPEDSET(...)
in options.h. This is a separate issue from PUB/SUB handling though.
This is a separate issue from PUB/SUB handling though.
@codypiersall Absolutely!
I was now able to install it using 989c5e90d48066b12ad44ed903cfc163d8e89b29 (the one before c3e062661388f70386d6766e3ce648030af340ee), which should already include the updated PUB/SUB handling.
The code has been integrated into master, and I'm finding vastly improved subscriber performance now.
@gdamore I can confirm the huge increase in performance. The burst example (cf. https://github.com/nanomsg/nng/issues/788#issuecomment-440006059) now easily manages burst_size
s of 16, matching the queue depth you mentioned. It nearly made it with 17, however, dropped a message after 761362 messages :-) – Well, I don't think this is unexpected as it is surpassing the queue depth.
Thanks for the improvements, and sorry again for my long absence from this issue.
Glad to hear things are working for you @lcnittl!
I'll hopefully file an issue and a fix tomorrow for the undefined nng_listener_setopt_uint64
and nng_dialer_setopt_uint64
. Turns out the fix is really simple: they're just not getting defined in NNI_LEGACY_DEFSETALL
. The silly reason I didn't notice this before is that I was looking in NNI_DEFSETALL
, thinking that NNI_LEGACY_DEFSETALL
was a compatibility layer for legacy nanomsg. D'oh!
I ported some code from nanomsg to nng.
There I also ran into the issue, where messages are lost, when I send many(e.g. 64 messages) in a loop.
It happens with pair
and bus
socket also. Regardless of "polled" or async receiving.
When I add 1µs delay between sending then it works fine.
Is this expected behavior? I haven't changed any socket options. Do I need to change some values to get the "old" behavior of nanomsg?
It was not a problem with the old nanomsg library. Sending 64 messages in a loop without delay worked fine there.
@qsodev the architecture is a lot different and legacy nanomsg was single threaded, so you probably couldn't as easily overwhelm the receiver. With nng it is much much easier to send faster than your receiver can receive.
bus is lossy, and you should not spam so hard.
The new pub/sub seems to be working well, so I'm closing this.
When porting a script from
zmq
topynng
(much nicer python coding experience) to broadcast data (bursts of 4 direct follow up messages per loop, approx. 60 loops/s, approx. 1 KB/s TCP traffic) from aPUB
to twoSUB
s, I experienced an unmissable data/message loss that was not experienced withzmq
.In followup investigations on this topic I found some issues in the
nanomsg
repository describing this problem (nanomsg/nanomsg#283, nanomsg/nanomsg#390).I was wondering, if this "behavior" is still expected to happen with
nng
. Probably yes, as of #762 and the Conceptual Overview section of thenng
manual:NNG & Platform details.
NNG: Using
pynng
(codypiersall/pynng@61c9f11) and thenng
version shipped with it. OS: Windows 10 10.0.17134 (64-Bit) PL: Python 3.7.0Expected Behavior
Not drop 0.5 % (or even 75 %) of messages.
Actual Behavior
Drops 0.5 % (or even 75 %) of messages.
Steps to Reproduce
The following python code compares the percentage of dropped messages between
nng
andzmq
Single Message Example
Yielding quite reproducibly the following values:
Quite interestingly,
zmq
manages to handle this fastPUB/SUB
by 2 orders of magnitude better thannng
. A shorttime.sleep()
aftersend
cures the loss in both cases, however, dramatically slows down the script.Message Burst Example
The following script simulates the beforehand mentioned 4 message data bursts:
Delivering the following results:
Again,
zmq
is the definite winner. In the code example it seems quite random which messages are lost (each of the for different messages are received over time). However, as stated in the code, puttingtime.sleep()
after send, leads to 75 % data loss fornng
and the sole message that received is the 1st of the 4 (1 out of 4 = 75 % loss).Am I missing some setting here to define a buffer size? Is there any option to make the
PUB
wait/block until the message was really sent (just sent, not received)?As stated by @gdamore in https://github.com/nanomsg/nanomsg/issues/390#issuecomment-98404170
a
PUB/SUB
socket was not the right choice to multicast "a lot of" data innanomsg
, which seemingly still is the case fornng
.What I thus wonder, and clearly do not understand: If
nng
'sPUB/SUB
is not the right choice, what else is?As I have to send this data fast and lossless, this issue is the only reason why I have to stick with
zmq
for my application. (Ok, except for this https://github.com/codypiersall/pynng/issues/9#issuecomment-437969587 - even with someC
code from @codypiersall)(Sorry for this extremely long issue :) )
Edit: Code example was 2 × the same. Changed the second to the real burst example.