redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.13k stars 831 forks source link

Support a new streams configuration source based on a KV subscription (NATS) #1345

Open albinou opened 2 years ago

albinou commented 2 years ago

Hi,

Because of the following needs:

We'd like to implement a new streams configuration source that would make Benthos read and update its internal streams configuration from a KV store (NATS).

In other words:

  1. at startup: Benthos would read the KV store to get its initial streams configuration.
  2. during its lifetime: when a KV value is updated, Benthos would update the corresponding stream configuration accordingly.

What do you think about this approach? Would it be a good addition to the project?

Thanks

Jeffail commented 2 years ago

Hey @albinou sure that sounds useful. I (relatively) recently added config live reloading and abstracted it so that hopefully we could use alternative methods such as etcd and other caches in the future, but haven't gotten around to actually doing it.

Ideally the implementation would be pluggable so that users importing benthos as a library aren't forced to bring in the respective client libraries.

grom-42 commented 2 years ago

Hello!

Comming back to this feature, we would like to make a plugin for doing this. :)

To add little context to the goal of this modification: Currently we use Benthos in streams mode and add/delete streams using the HTTP API. In order to add persistence to our stream configurations, and because we are already using NATS in our infra we would like to add a system to manage stream configurations from a NATS KeyValue.

The pseudo code of the plugin would be something like this:

func main() {
    service.RegisterStreamsManager(func(strmgr service.StreamsManager) {
        go func() {
            for {
                // Read Streams source events (new stream to create, stream to delete...).
                // ...
                switch evt {
                case StreamsSourceEvent.Add:
                    // ...
                    err := strmgr.Create(id, config)
                    // Handle error.
                case StreamsSourceEvent.Remove:
                    // ...
                    err := strmgr.Delete(context.Background(), id)
                    // Handle error.
                }
                // Exit condition.
            }
        }()
    })

    service.RunCLI(context.Background())
}

But for this to work, we need to bring a modification to Benthos. We poked a modification of the core which works for us. The handler passed by the plugin would be called at the end of the initStreamsMode function passing the streamMgr (*manager.Type) as argument which already implement the following service.StreamsManager interface:

type StreamsManager interface {
    Create(id string, conf stream.Config) error
    Delete(ctx context.Context, id string) error
}

Is this modification would be ok for you, I mean theorically, or would you have a better approach to do so?