tsaikd / gogstash

Logstash like, written in golang
MIT License
644 stars 106 forks source link

Backpressure handling #175

Open helgeolav opened 3 years ago

helgeolav commented 3 years ago

We discussed this a bit in #135. This is my proposal for an interface that can be used to handle backpressure.

tsaikd commented 3 years ago

The message channel full procedure can be handled by native go select syntax.

// func (t *InputModule) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error)
select {
case msgChan<-msg:
  // send log event normally
default:
  // the message channel is full
}
helgeolav commented 3 years ago

I don't think that pattern is good enough. There is no way for the output to signal back that there are some issues that needs attention, and the select pattern does only cover the case where the channel is full and is blocking.

Let's use the HTTP output as an example. There are several outcomes from the Output():

  1. The event was passed on. Everything is ok. This is the normal situation.
  2. An issue, let it be server down or network connectivity prevents the output to complete a tree-way handshake. In this case the output blocks the channel for an amount of time before you get a timeout and an error it sent back and the next event is served.
  3. Another issue where the webserver is either stopped (getting a connection reset immediately) or some internal issue in the receiving web app where it returns a HTTP 5xx error.

In either case gogstash will continue to process messages and they will be lost. The problem with the input is that it does not know when it is ok to continue to receive messages and will just have to retry at random intervals.

While thinking, should there also be a Requeue() to allow the output to reschedule the event at a later time in case of issues?

helgeolav commented 3 years ago

I hope you see why I think we need a better kind of backpressure handling. I have been thinking and I think it can be cleaner to move RequestPause() and RequestResume into output.OutputConfig - and RegisterInput() into config.InputConfig, still using CanPause so the developer get some freedom in how to implement it. This way it looks more «integrated» into gogstash.

I can write a new proposal on this if you want.

tsaikd commented 2 years ago
  1. An issue, let it be server down or network connectivity prevents the output to complete a tree-way handshake. In this case the output blocks the channel for an amount of time before you get a timeout and an error it sent back and the next event is served.
  2. Another issue where the webserver is either stopped (getting a connection reset immediately) or some internal issue in the receiving web app where it returns a HTTP 5xx error.

Output modules can retry forever to prevent data lost. In another word, the error handling can be done in output modules.

My questions:

  1. when the input module get the notification of RequestPause, what's the reaction you want to do?
  2. how to handle the case if some of the Input/Output modules do not support pause function?
helgeolav commented 2 years ago

Hi

I disagree that the output modules can retry forever. At some point they will either drop the event or gogstash runs out of memory. In my case am I using gogstash to handle around 180k events per second, and in case an output tries to queue everything it will not take long before I run out of memory and gogstash crashes.

If we look at the HTTP output. It will drop the message right away (or after a timeout), reporting an error. The elastic output will not report an error if the message has been received by olivere, but asynchronously just log that there has been an error. The file output will drop the event if it for some reason cannot write to the file.

For your questions;

  1. I want to let the input decide for it self how to handle it. I will put a list of suggestions for each input below.
  2. In case the input does not support Pause then it will be as today. Not all sources can pause without loosing data.

For the inputs;

Implementing this is not an easy task as each input that we work on will take some time. My intention is to start on the framework and work on the inputs/outputs I know. Just make sure it works before we move on.

I also mentioned a Requeue() event. Providing something like this makes it easier for each output to requeue an event in case of a retry-able error, and have a common way to handle such errors.

tsaikd commented 2 years ago

https://github.com/tsaikd/gogstash/commit/50d72c40aaf9147b4cc8ccac3e8dbb694640b8c6 does the interface fit your requirement?

helgeolav commented 2 years ago

I had a quick look and it seems to work. I suggest we just need to give it a try and get some experience.

helgeolav commented 2 years ago

Is it something else you want me to do here?

tsaikd commented 2 years ago

Up to you, keep it for discussing or close it.

Could you help to check daf188933d5495cc4cf2c2b24f4b32fbb3a59cac is working or not?

helgeolav commented 2 years ago

Hi

Sorry for the late answer. I have looked more into your control branch and implemented correct handling on two inputs. in #178. I have tested both inputs by modifying outputstdout (that change is not commited) to pause for a few seconds after each message received.

Would it be possible to merge my change in and commit it to master?