nats-io / nats-streaming-server

NATS Streaming System Server
https://nats.io
Apache License 2.0
2.51k stars 283 forks source link

Where's the section to the write buffer cache timing? #613

Closed unisqu closed 6 years ago

unisqu commented 6 years ago

I saw a response below. Would like to modify the code where there can be a write buffer cache that flush to disk after 30 seconds or something (adjustible). Can you pls point me to this section? Will be great if it can be a configurable value.

@kozlovic We used to keep all stored messages in memory in older releases. Now there is a cache, but the stored message are evicted from that cache within a second. On top of that, there is a write buffer that is used to minimize disk writes. This buffer is flushed to disk when server processes a set of incoming messages (in the ioLoop). This buffer can be disabled with -file_buffer_size 0. But even when buffer is in use, it is shrunk when not needed. Of course, if you have tons of channels, then each channel will have a buffer, even small it could add up.

kozlovic commented 6 years ago

@unisqu I see that you have created several issues this week-end but then closed them. I hope answering this one will also clarify the questions you had in the other issues.

When the server gets a message from a publisher, it needs to persist it before sending the ACK back to the publisher. That is the contract. Publishers can use asynchronous calls so that they don't block on waiting for the ACK back, but they will still get that ACK back, and only after the message has been persisted.

So the write buffer cache as no effect on a single produced message, that message will be persisted before the ACK is sent back. The only advantage of having a bigger write buffer size is that if there are many published messages processed by the server (either because of async publisher or multiple publishers at the same time), then it is possible that the server issue a single disk write call because those messages were lay out in a memory buffer before.

Consuming messages is typically slower than producing them, so when the serve gets a message, persist it, it will also try to send it to matching subscribers, and will do so before sending the ACK back to the publisher. The advantage is that if the message can be sent to the subscriptions, then looking up the message would be fast because it is likely still in the small cache of the filestore implementation. If the server was to send the ACKs back to publishers, then to try to send to subscriptions, this may cause the subscriptions to fall behind quicker and then require a disk read to lookup the messages that need to be sent. In other words, there is no incentive for the server to make the producers faster when there are matching subscriptions.

Hope this helps clarify some of the questions you had.

unisqu commented 6 years ago

This is a fantastic explanation that clears all doubts I have.

However, I would like to have the ACK send back to the publisher before the subscriber, because my subscriber can wait but my producers can't wait for the ACK.

I'm using nats streaming server as a logging system. My producer requires heavier write and slow reads is fine. Can you show me the section where I can edit this part? Thanks a lot.

kozlovic commented 6 years ago

Let's be clear, even if the server were to send that ack back, it would still need to trigger send to subscriptions, and at the moment this happens in the ioLoop, so ultimately, publishers would still be slowed down.

If you want to experiment, this is where the server "flushes" the written messages to disk: https://github.com/nats-io/nats-streaming-server/blob/master/server/server.go#L3746 and here it replies to publishers: https://github.com/nats-io/nats-streaming-server/blob/master/server/server.go#L3765

unisqu commented 6 years ago

Based on your experience, if I swapped their positions. Do you think there will be a problem other than a very small possibility that I will lose actual write data in the event of a server shutdown by electrical power failure.

I can't see any major unforeseen problem... i've read it three times.

        // Ack our messages back to the publisher
        for i := range pendingMsgs {
            iopm := pendingMsgs[i]
            s.ackPublisher(iopm)
            pendingMsgs[i] = nil
        }

        // clear out pending messages
        pendingMsgs = pendingMsgs[:0]

        // flush all the stores with messages written to them...
        for c := range storesToFlush {
            if err := c.store.Msgs.Flush(); err != nil {
                // TODO: Attempt recovery, notify publishers of error.
                panic(fmt.Errorf("Unable to flush msg store: %v", err))
            }
            // Call this here, so messages are sent to subscribers,
            // which means that msg seq is added to subscription file
            s.processMsg(c)
            if err := c.store.Subs.Flush(); err != nil {
                panic(fmt.Errorf("Unable to flush sub store: %v", err))
            }
            // Remove entry from map (this is safe in Go)
            delete(storesToFlush, c)
            // When relevant, update the last activity
            if c.activity != nil {
                c.activity.last = time.Unix(0, c.lTimestamp)
            }
        }
kozlovic commented 6 years ago

You lose the guarantee that the messages have been persisted and break the contract, you also don't gain much since although you are sending an ACK back in a given loop iteration before flushing and sending messages to consumers, as you can see above, the new published messages are not processed until after the second for loop executes.

unisqu commented 6 years ago

What about this way then? Sorry for the question. I'll be a heavy user and will definitely be useful in future to this project. Will this work? I've been testing nats.io streaming server for the past few days and written a lot of sidecars to go with it.

    // flush all the stores with messages written to them...
    for c := range storesToFlush {
        if err := c.store.Msgs.Flush(); err != nil {
            // TODO: Attempt recovery, notify publishers of error.
            panic(fmt.Errorf("Unable to flush msg store: %v", err))
        }
        // Call this here, so messages are sent to subscribers,
        // which means that msg seq is added to subscription file
    }

    // Ack our messages back to the publisher
    for i := range pendingMsgs {
        iopm := pendingMsgs[i]
        s.ackPublisher(iopm)
        pendingMsgs[i] = nil
    }

    // clear out pending messages
    pendingMsgs = pendingMsgs[:0]

    for c := range storesToFlush {
        s.processMsg(c)
        if err := c.store.Subs.Flush(); err != nil {
            panic(fmt.Errorf("Unable to flush sub store: %v", err))
        }
        // Remove entry from map (this is safe in Go)
        delete(storesToFlush, c)
        // When relevant, update the last activity
        if c.activity != nil {
            c.activity.last = time.Unix(0, c.lTimestamp)
        }
             }
unisqu commented 6 years ago

I sort of understand why it's written this way. However, somehow I feel there's a better logic flow to this. At least ... in a "write-back" cache implementation of sorts. This part... needs some configurable actions...

Thanks by the way... Appreciate it.

kozlovic commented 6 years ago

You are ensuring that messages are flushed before ack'ing publishers, which is better. But like I said, it won't help that much. Experiment and you will see.

Also, if you plan to be heavy user like you say, it may not be wise to have your own modifications. If you later have an issue, we may not be able to help since that would not be the same code that is in this project.

kozlovic commented 6 years ago

So if you try and you see improvements and the new behavior solves your performance requirements, then we could add a feature to the project to ack before process type of thing.

ripienaar commented 6 years ago

From my perspective as a non NATS employee I'd say NATS is fine as it is, not every tool have to fill every role. Stream processors have a specific niche and that niche have specific affordances that streaming is built around. The way its coded serves a set of guarantees that are needed - perhaps not by you.

There are better log management tools than a streaming server, at some point you need to realise you are not picking the right tool for the job.

kozlovic commented 6 years ago

@ripienaar Agreed, and from the question you (@unisqu) have opened this week-end, it indeed sounds like NATS Streaming would not be a good fit (performance requirements, current design behavior, etc..).

unisqu commented 6 years ago

@kozlovic , @ripienaar

both of you are right in your own way.

One last question for @kozlovic, is this breaking anything with the

          s.ackPublisher(iopm)

sandwiched between them? I'm curious will it break anything that's all. If not this logic is actually the best use case for me (for the moment)

// flush all the stores with messages written to them...
for c := range storesToFlush {
    if err := c.store.Msgs.Flush(); err != nil {
        // TODO: Attempt recovery, notify publishers of error.
        panic(fmt.Errorf("Unable to flush msg store: %v", err))
    }
    // Call this here, so messages are sent to subscribers,
    // which means that msg seq is added to subscription file
}

// Ack our messages back to the publisher
for i := range pendingMsgs {
    iopm := pendingMsgs[i]
    s.ackPublisher(iopm)
    pendingMsgs[i] = nil
}

// clear out pending messages
pendingMsgs = pendingMsgs[:0]

for c := range storesToFlush {
    s.processMsg(c)
    if err := c.store.Subs.Flush(); err != nil {
        panic(fmt.Errorf("Unable to flush sub store: %v", err))
    }
    // Remove entry from map (this is safe in Go)
    delete(storesToFlush, c)
    // When relevant, update the last activity
    if c.activity != nil {
        c.activity.last = time.Unix(0, c.lTimestamp)
    }
         }
kozlovic commented 6 years ago

That's fine, it still honor the contract that the server has persisted the messages before sending the ACK back to the publisher. It then proceeds to send to matching subscriptions, etc..

unisqu commented 6 years ago

@kozlovic I only have one subscriber and multiple publishers. you have been great help! do mention what i can do to help in future. appreciate it.

kozlovic commented 6 years ago

No problem! Thanks

unisqu commented 6 years ago

For what it's worth, with the code modification, 20% improvement in delivery... it's the way my code was written. only applicable to me.

briefly stated, as a logging system, this modification is great too. in fact it's a fantastic modification to a distributed logging system. of course there needs to be a lot of other code that wraps around this nats streaming server to make what I mentioned meaningful but as it is, i'm very satisfied with the modded code. maybe in future others will see the value I mentioned here.

20%-30% speed up seen from publisher acknowledgement (only apply to my application). This is important by itself.

hope all code modification in future can really add this as a feature because the rational is:

  1. publisher (single one or multiple ones of them) needs to acknowledge message faster to carry on with other things, subscriber can wait.

actually this 1 point is all that matters. durable write and forget.

kozlovic commented 6 years ago

Good. I may then add this as a configurable behavior. Thanks for the feedback.

unisqu commented 6 years ago

out of curiosity and as a thought experiment, can switch positioning on the subscriber vs publisher priorities irrespective of memory or file storage mode.

NSSF = Nats-Streaming-Server File Storage NSSM = Nats-Streaming-Server Memory Storage (durable mode) CC = Customized Connectors (or very slow subscribers) (Write heavy side) ================================================= (Read heavy side) Multiple Publishers -> NSSF -> CC over long distance or LAN or goroutine Memory -> NSSM -> Multiple Subscribers

It's late for me. Sleepy and can't think well but hope this may inspire anyone reading.

Thanks again.