nats-io / nats.c

A C client for NATS
Apache License 2.0
390 stars 137 forks source link

NATS Core + NATS C Client + NATS CLI message loss on fast CPU's #667

Closed NostraMagister closed 1 year ago

NostraMagister commented 1 year ago

Here is testcode, incl. a temporary workaround, to reproduce how NATS Core with C Client looses messages on fast machines. This error is NOT consistent .On laptops(slower) the loss of message is less then on 16 / 32 / 64 Core AMD Threadripper CPUs.

Setup: NATS CLI listens to : sub ">" on a Windows machine. Logged in with ".creds" file. NATS Server is installed on a Linux Ubuntu.

Application: On Windows, logs in using a '.creds' files and runs the code below to publish. I provide a 'case' statement to simulate the error and the workaround. This routine is a minimum requirement to reproduce the problem. Didn't incl. login because it is .creds file based and has my keys in it. Hence you must provide a connected natsConnection* nc .

Can be tested with a SINGLE message, doesn't need any heavy load to fail. The NATS server is DEDICATED to this test (no other traffic) and almost idle network. Windows systems CPU usage below 2% at the moment of the test.

Used NATS C Lient Header: #include "nats.h" and latest version from GitHub.

`natsStatus PublishIt( natsConnection* nc ) { // Create subject std::string subject = "test";

// Fill a buffer with data to send
char* buf = new char[1024];
int len = sprintf(buf, "This is a reliability test to see if NATS looses messages on fast systems and if possibly the provided buffer is cloned after the natsConnection_Publish() function allready returned. If that is the case it would explain NATS high performance but while being unreliable depending on the underlying CPU speed and thread-lottery.");

// Publish
natsStatus nstat = natsConnection_Publish(nc, subject.c_str(), (const void*) buf, len);
if (nstat != NATS_OK) { printf("natsConnection_Publish() Failed"); return nstat; }    // <<< never failed always NATS_OK.

// Select the test according remarks next to the 'case' statements.
int selectTest = 3;
switch (selectTest)
{
    case 1: // This looses messages. NATS CLI doesn't display

        delete[] buf;
        break;
    case 2: // This is a memory leak BUT NEVER looses any message and above text appears on NATS CLI
            // Will eventually run out of memory of course and isn't an acceptable solution.
            // 
        // don't delete buf[]
        break;
    case 3: // This is a workaround that doesn't loose messages and NATS CLI shows text BUT it looses all the NATS performance advantages. presumably the add flush creates just sufficient time for 'buf' to be cloned or used by natsConnection_Publish().

        nstat = natsConnection_Flush(nc);
        if (nstat != NATS_OK) printf("NATS Flush Failed: %i", nstat); // <<< Flush never failed in any test.
        delete[] buf;
        break;
}
return nstat;

}`

I think this may be a synchronization problem or the documentation @ http://nats-io.github.io/nats.c/group__conn_pub_group.html#gac0b9f7759ecc39b8d77807b94254f9b4 of natsConnection_Publish() should mention that the provided buffer becomes owned by natsConnection_Publish().

NOTE: I tried adding a single flush with 2 sec timeout before disconnecting, but that doesn't work. The flush must be in the procedure above.

Thanks for looking into this.

kozlovic commented 1 year ago

.. should mention that the provided buffer becomes owned by natsConnection_Publish().

It is not. The library makes a copy, or possibly tries to write directly to the socket, but in no case after the call returns the library holds a reference to the user provided buffer.

See here where you can see that the library writes the user buffer. In that function, you can see that it appends to internal buffer or write to the socket.

I believe the issue is simply that if your application sends a single message and exits right away, the buffer has simply no chance of being flushed (that is written to the socket) and this is why the subscriber does not receives it.

You don't have to flush every single message, but keep in mind that the flushing is happening in a background thread or if the internal buffer reaches its capacity. Flushing before the application exits, or by calling natsConnection_Close() should work, so I am surprised when you say that you tried that and it doesn't.

NostraMagister commented 1 year ago

I revisited the flush before disconnect and did a regular flush, without timeout, and then std::this_thread::sleep_for((std::chrono::microseconds)10000);

If I do that the final flush works. After your explanation that the flush is async. I figured that it needs a little time to execute.

I also looked into your comment about not doing a flush each time. So did a test with only flushing after every last message of a set of messages and that works too. Then again, in practice one does often not know which the last message will be. I guess using a flush on a timer or so would solve it 100%.

Thank you for the attention you have given this.

kozlovic commented 1 year ago

@NostraMagister The library is already flushing based on notification that there is something to flush. So you should not have to do a flush on a timer, what you need to do possibly is flush before close of the connection before your application exit.