tsaikd / gogstash

Logstash like, written in golang
MIT License
644 stars 106 forks source link

Added discard filter #186

Open helgeolav opened 2 years ago

helgeolav commented 2 years ago

In this PR I have added a discard filter. This filter allows me to

  1. Discard an event from further processing when I know that I will not need it. Today this has to be done by using a cond output, but you still need to send the event through all stages of processing before it is discarded.
  2. Discard the event in case we have back pressure issues. This can be handy if the output supports back pressure, the input does not and discarding the event is your best option in this case.

I also rewrote the filter handling routine to allow for an event to start at a specific filter, instead of at the beginning of the list of filters. I think there are many good ways to use this. I am currently working on a filter where my goal is to identify and remove all events of a kind expect the last one. To do this I need to discard all but the last event that I need to inject for further processing from the next filter in the pipeline.

Other good examples for the discard code can be logstash aggregate or logstash throttling.

tsaikd commented 2 years ago

For filters execution priority, I prefer control by users config instead of FilterPos, so the users are more elastic to combine multiple components for their goals. Even using the same filter in multiple times and multiple positions with different configs.

In your case, you can suggest the users to put the filter at the beginning in your README.

helgeolav commented 2 years ago

Hi

I think I did not explained this well. FilterPos was a way to serve two purposes:

  1. being able to completely discard events in the filter chain, as shown with the discard filter.
  2. have a way for a filter to take an event out of processing to be able to continue it later.

For my number two change I am working on a filter that is a bit like logstash aggregate. I am running this filter now in test, but I am currently using the filter as a standalone program (gogstash instance 1 -> my filter -> gogstash instance 2).

This filter looks at similar events (based on the value from a configurable key), queues them up in memory and if there are more than one event based on the same key only the last is kept, the others are discarded. After some configured time of inactivity (no new events based on the key value) the event is sent back into gogstash.

A workflow like this is not possible to do in gogstash without a mechanism like FilterPos. It is not my intention to use this field to jump/skip parts of the chain, for that we have cond.

tsaikd commented 2 years ago

Thinking about this kind of filter:

  1. when receive the new event first time, keep in memory, no output now.
  2. when receive the next event, check the key exist in memory with the same key. 2-1. if key exist: keep the last one 2-2. if key not exist: store in memory
  3. flush memory in your config time period.
helgeolav commented 2 years ago

The way the filter flow works is that it is single threaded (unless you have more workers), so I need a way to signal back that I want a message dropped. I cannot hold the queue, all filters have to do its work as quickly as possible.

The way you described above is exactly the way I planned to implement it. Step 3 is the hardest, and to be able to flush I need a mechanism to send an event into the next filter in the filter chain.

I can’t see any good way to handle this other than being able to insert an event into the chain at a configurable spot.

tsaikd commented 2 years ago

There are only one input channel chInFilter and one output channel chFilterOut in a process, and all filters are running in one go-routine, so I think it's ok to be treated as a single thread modal in the filter point of view.

flush example: by FIFO

    var eventPool ThreadSafeTreeWithTimeKey // member of filter
    timer := time.NewTimer(config.FlushDuration)
    for {
        select {
        case <-timer.C:
            processBeforeTime := time.Now().Add(-config.FlushBeforeDuration)
            for {
                eventPool.lock()
                event := eventPool.first()
                if event == nil || event.Time.After(processBeforeTime) {
                    eventPool.unlock()
                    break
                }
                eventPool.remove(event)
                eventPool.unlock()
                chFilterOut <- event
            }
            timer.Reset(config.FlushDuration)
        }
    }
helgeolav commented 2 years ago

From the code snippet above I see that you are sending the event straight to the outputs. I was more in line of letting all filters handle the event. Below is an example of a pipeline that could be something that I wanted to do, assuming that the input codec prepared all the fields for me.

Filter 1: discard

Look into the event and see if I want to continue on it or discard it. If I discard it no further processing is done on the server.

Filter 2: aggregate

This filter will look at each event and see if it is a continuation of a previous event. If it is - update the state. If not - create a new state.

A dispatcher will scan for events that are ready to be sent further into the chain. The dispatched event could be sent back to chFilterIn so it can processed by filter 3 and filter 4.

With my PR I want to use the discard option to discard the event so it is not processed any further.

Filter 3: ip2location

Get IP information from the event.

Filter 4: hash

Make a hash of the event.

tsaikd commented 2 years ago

In your Filter 2: aggregate, it's the decision maker for discarding or not. So I think the better solution is making the filter to discard events (do not dispatch the event to chFilterOut). For example, extend the return value of Filter.Event():

type FilterReturn struct {
    event   logevent.LogEvent
    stop    bool // do not process the next filter
    discard bool // discard the event
    // more flow control functions
}

-func (TypeFilterConfig).Event(context.Context, logevent.LogEvent) (logevent.LogEvent, bool)
+func (TypeFilterConfig).Event(context.Context, logevent.LogEvent) (FilterReturn)

Why not extend the control properties in logevent.LogEvent? Because it's not used in input/output filter.