redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.12k stars 830 forks source link

RFC: Benthos Lambda changes #207

Closed kconwayatlassian closed 5 years ago

kconwayatlassian commented 5 years ago

I've been tinkering with the Lambda binary and had a few things I wanted to run by folks and get thoughts on. I'm happy to split this into multiple issues for multiple threads if we need it. To demo things, I've put together https://github.com/kconwayatlassian/benthos-ext that implements each of the ideas below:

I'm looking for feedback on 1) if any of the ideas have value beyond my team that would justify having them in Benthos and 2) feedback on whether the demo code would be acceptable as a Benthos contribution or how I'd need to refactor in order to meet the project standards/style.

Jeffail commented 5 years ago

Hey @kconwayatlassian, code looks great. I'd like to get all of these features merged in but I'd also like to take a slightly different approach to implementing the combination of outputs.

We could add a new output plugin (exclusive to the lambda distribution) called something like lambda_response, which will be the default output type (instead of stdout) for a lambda deployment. The implementation of this output type would then write to a global buffer which gets checked for content after every execution. This would save us from needing to manually walk the broker tree for stdout types and would also allow you to configure specific output processors for the response:

pipeline:
  processors:
  - type: http
    http:
      ...
output:
  type: broker
  broker:
    pattern: fan_out
    outputs:
    - type: kinesis
      kinesis:
        ...
    - type: lambda_response
      processors:
      - type: jmespath
        jmespath:
          query: "document" # Only send a field of our result back in the response

I can take a look at implementing this part ^ next time I'm free. A PR for the code export and test coverage part would be great but we can take a look at that again afterwards.

kconwayatlassian commented 5 years ago

I like the dedicated output idea, especially with the custom processor support. A couple of questions/comments for it.

Jeffail commented 5 years ago
kconwayatlassian commented 5 years ago

@Jeffail I know you've been busy with the beta branch and config changes. Have you had any time to think some more about this?

Jeffail commented 5 years ago

@kconwayatlassian, I've iterated through a few different ideas, the closest I can get to something deliverable that contains minimal hackiness would be for each call to insert a "caller id" to input message payloads (via its context) that indicates at the output level where to store these synchronous responses. Once the ack is received by the caller it is able to check some global shared structure for the result and clean it up.

The risk to something like this is that custom processor plugins might wipe the caller id by wrapping or removing the message context, in which case the output would be lost. I think that might be an acceptable circumstance since we can log and emit metrics for those cases.

I'd like to take another week to possibly think this through more but I'm confident it can work.

kconwayatlassian commented 5 years ago

Sounds good. I'll check back in around that time.

In the meantime, I'll make one more pitch for the config pre-processor solution combined with channel management rather than the global coordinator. Personally, I find the linear flow of using the pipeline and output channels as being easier to think about than using global state to coordinate message delivery.

I may be mistaken but my understanding of the types.Pipeline contract is that each instance processes messages synchronously. That is, once the pipeline reads a value from the input channel then two things result: 1) no other values from the input channel are read which effectively blocks any other goroutines attempting to send a message through the pipeline and 2) the only possible output from the pipeline is a result for the message it previously read. At least, that's what I see when I read through https://github.com/Jeffail/benthos/blob/master/lib/pipeline/processor.go#L81. If that's the case then writing to and then reading from a channel seems like a pretty clear way to manage getting the processed value(s). I put together an annotated demo of what this could look like here if it helps illustrate: https://github.com/kconwayatlassian/benthos-ext/blob/master/pkg/producer.go .

At least one downside to the approach I'm pitching is that is effectively limits the processing concurrency of a serverless instance to one even if multiple invocations can send to the output concurrently. While this is likely a non-issue for Lambda or GCP it will put a constraint on Azure functions which allow for concurrent calls to the same function instance. It would be fairly easy to convert my demo's use of a single set of channels to a pool that supports up to N (configurable) concurrent message processors so that it could match the Azure settings. From your descriptions, I believe this is the specific problem you are hoping to solve with the coordinator: enabling concurrent pipeline usage.

Ultimately, I'm open to any solution you come up with that enables the feature and fits your vision for the project. I just wanted to get these ideas out in case they help spark any new thoughts. :thinking:

Jeffail commented 5 years ago

Thanks @kconwayatlassian, I think that's a cleaner solution but there's a fair amount of complexity needed to walk any possible tree of output brokers. For example, if someone were loopy enough to configure the following:

output:
  type: broker
  broker:
    pattern: fan_out_sequential
    outputs:
    - type: foo
      processors:
      - type: bar
    - type: broker
      broker:
        pattern: fan_out
        outputs:
        - type: stdout
          processors:
          - type: baz
        - type: stdout
          processors:
          - type: qux

Then we'd either need to accept that the output behaviour will be different to what the author would expect, or we'd need to write an output walker that catches all possible arrangements and knows to reconstruct the individual processing steps for each instance of stdout. This also opens us up to regressions as new features and brokering types are added.

Doing it with custom output types handles all the possible output arrangements someone might concoct both now and in the future. We can continue treating the stream pipeline as a black box, and if one day I bake in a cleaner mechanism for multiplexing messages back to the source we can swap out the hacky part.

kconwayatlassian commented 5 years ago

If it helps, I put together a demo of using the Manager as the coordinator between a custom output and a serverless function: https://github.com/kconwayatlassian/benthos-ext/pull/1 . At least, this is how I've been interpreting your idea.

The outline is:

Jeffail commented 5 years ago

@kconwayatlassian, looks good, pretty much what I had in mind. The only adjustment I'd make is to use message contexts rather than wrapping the message type: https://godoc.org/github.com/Jeffail/benthos/lib/message#GetContext.

So something like:

ctx := context.WithValue(message.GetContext(p), "serverlessID", serverlessID)
p = message.WithContext(p, ctx)
kconwayatlassian commented 5 years ago

@Jeffail One edge case that stands out in this approach is the possibility that an error within the publishing pipeline or a simple failure to fetch the result can result in a leak of unclaimed results in the manager registry. There's a different technique we might use here where we set the context value to a pointer type such that later code can set the pointer in the context. For example: https://play.golang.org/p/G2CGB5MDju5

This would type the captured message to lifetime of the context rather than the global map. I believe this would also remove the need to perform any coordination across components via the manager by isolating the communication to the context object. Granted, we'd need to be extra careful with manipulating the pointer values but I believe this may be a reasonable alternative to consider.

Jeffail commented 5 years ago

@kconwayatlassian, I like that, seems much simpler. We also need to take into account the fact that multiple serverless outputs might be configured that will both try to set the output message, and if a batch is created and split then they might want to add multiple types.Message. Might be worth adding a thread safe structure to the context rather than just a pointer, something that allows outputs to safely append a new types.Message.

kconwayatlassian commented 5 years ago

@Jeffail This brings up another case I hadn't considered. I suppose if someone applied per-output processors as part of a broker setup this would happen. We can definitely make a concurrency safe mechanism to add a message.

The current serverless implementation returns a single message part if there is only one to return ({}). If there are multiple parts then it returns an array of all parts ([{},{},...]). Would there be a new format for multi-message outputs or should we collapse all of the parts into a single array?

Jeffail commented 5 years ago

@kconwayatlassian oh that's a good question. I think since we're already dynamically setting the output format based on the number of messages in a batch we've already set a precedence for outputting multiple batches as arrays within arrays. So that would make our possible output formats look like this:

Single message of a single batch: {} (JSON object) Multiple messages of a single batch: [{},{}] (Array of JSON objects) Multiple batches: [[{},{}],[{}]] (Array of arrays of JSON objects, batches of size one are a single object array in this case)

This is all pretty dirty since it's possible that a pipeline might output any of these three formats depending on the input data. Collapsing into a single array is definitely an option but that's going to obfuscate the result and break certain use cases. Since we're already going out of our way to support every possible arrangement I think that would be a mistake.

I think I'm happier with setting these three possible output formats for now, if it ends up being cumbersome for users we can revisit the possibility of flags for explicitly choosing the format.

kconwayatlassian commented 5 years ago

@Jeffail One potential issue with this is that the array results are not guaranteed to be in a deterministic order if there are concurrent outputs via a fan-out. My team doesn't yet have a use case where our broker layouts are complex enough such that we'd encounter this issue but my primary reason for having the values returned for the single and multi-part cases is so that we can use the response in the lambda caller.

I'm not sure what someone with a multi-output needs but I'm assuming they'll want to isolate data from different parts of the pipeline. What would you think of using a map as the type and having an optional configuration value for the serverless output that was name (or something) that would represent a key in the resulting JSON object? We could have it default to serverless0, serverless1, etc., to guarantee uniqueness in the default case and allow users to override with their desired keys: {"myPipeline": [{},{}], "myOtherPipeline": [{}]}.

It's still not ideal from a consistency perspective but would have the internal reasoning of: "If you have only one serverless output then the result is exactly what it received. If you have more than one serverless output then you get a keyed map of arrays.". I suppose this would also create an opportunity for misconfiguration that caused data loss in the response unless we can validate that all the names are unique.

Jeffail commented 5 years ago

I like that idea but we can add that afterwards, as long as it's a configurable field on the serverless output type then there's a backwards compatible upgrade path. For now I would be happy simply recommending fan_out_sequential rather than fan_out when the ordering of output batches needs to be deterministic.

kconwayatlassian commented 5 years ago

Sounds good. I'll let these ideas bake for a bit and ping you later this week to see how you feel about moving forward. I'm happy to have you roll this out if you have a specific idea of how you want it done. I'm also happy to make PRs and go through some rounds of feedback until things look good.

Jeffail commented 5 years ago

Hey @kconwayatlassian, I've been working on a WASM build of Benthos that runs in the browser for experimentation and I intend to use a very similar mechanism for capturing the output. My plan is to use this to experiment and whatever comes out the other end we can add to the lambda pipeline.

kconwayatlassian commented 5 years ago

Sounds good. I'll try to track your work on that. One thing that stands out about the current result store implementation is that it doesn't key the results by anything. For the case of multiple writers to the same store they'll get their results interleaved.

As an alternative, I backed the result store with a sync.Map in my prototype (https://github.com/kconwayatlassian/benthos-ext/blob/master/pkg/responses.go#L35) and used it to implement all the various output formats we talked about (https://github.com/kconwayatlassian/benthos-ext/blob/master/pkg/producer.go#L141).

The way I set things up is such that the results will be interleaved by default but can be keyed if you add a "name" parameter in the config.

Jeffail commented 5 years ago

Got it on a branch now: https://github.com/Jeffail/benthos/tree/feature/advanced-serverless Preliminary docs: https://github.com/Jeffail/benthos/blob/feature/advanced-serverless/docs/serverless/lambda.md#running-a-combination

I'm kind of comfortable with the idea of mixing the outputs together as it follows the same logic as having multiple stdout outputs. If a user is hellbent on needing to partition the results at the end of the pipeline then they can always configure processors to add identifiers to the data in whatever way needed, or organise the output layer to output in the ordering that you need.

I'll think it over some more, no need to merge just yet. I can probably test this branch next week, would you have any time to try it out yourself?

kconwayatlassian commented 5 years ago

I should have a chance to try it out this week. Will post back with the results.

kconwayatlassian commented 5 years ago

@Jeffail I was able to get a lambda stood up that used a broker where one of the outputs was serverless_response. Appears to be working as expected. I've only deployed something that has one message with one part and no splitting so I'm only getting the singe value response. Seems promising to me, though.

Jeffail commented 5 years ago

Thanks @kconwayatlassian, I've played around with the batch sizing and it looks good. There were a few cases where certain processors were wiping message context but I've fixed those now. My plan is to merge today, do some more verification, then make a release either today or tomorrow. I'm really happy with where this feature ended up, there's definitely some compromises but I think it puts us in a good position for now.

I massively appreciate the time you put towards this, I could've easily made a nasty mess out of it without your insight.

Jeffail commented 5 years ago

Implemented: https://github.com/Jeffail/benthos/commit/31b06dff1b3ad2904a72c579d747fbb2e6181f1a Released: https://github.com/Jeffail/benthos/releases/tag/v2.4.0

kconwayatlassian commented 5 years ago

@Jeffail Thanks for hearing my ideas out and getting this feature landed. I'm working to get this rolled out for us now. I'm pretty excited about the whole thing.