redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.1k stars 824 forks source link

WriteBatch Method in BatchOutput Interface Does Not Return Errors Anymore #2549

Open mohitmarwal opened 5 months ago

mohitmarwal commented 5 months ago

Discussed in https://github.com/benthosdev/benthos/discussions/2548

Originally posted by **mohitmarwal** April 29, 2024 Previously, when using the BatchOutput interface in Benthos, the WriteBatch method used to return an error if delivery was not possible. This was crucial for handling error scenarios and responding appropriately, especially in HTTP server contexts where the client expects meaningful error responses. Earlier for example we were getting message error in the input - httpServer: . But now this error is not returned back to http server instead we get request time out with error printed in console in loop. func (o *BatchOutput) WriteBatch(ctx context.Context, batch types.MessageBatch) error { return fmt.Errorf("message error: %w", err) } However, with the latest update, it seems that the WriteBatch method does not return errors anymore. Instead, when an error occurs, the client receives a request timeout response, which is misleading and does not provide useful information about the actual error. Expected Behavior: The WriteBatch method in the BatchOutput interface should return an error if delivery is not possible, as it did before. This error should be propagated back to the caller so that appropriate error handling can be performed, such as returning meaningful HTTP responses with error details. Steps to Reproduce: Use the BatchOutput interface in Benthos. Implement a custom output that implements this interface. Invoke the WriteBatch method with a scenario where delivery is not possible (e.g., invalid message format, connection failure). Observe that instead of receiving an error, the client gets a request timeout response. Impact: This change affects users who rely on the BatchOutput interface for writing batches of messages, especially in scenarios where error handling and response generation are critical, such as HTTP servers. Proposed Solution: Restore the previous behavior of the WriteBatch method to return errors when delivery is not possible. This ensures consistency and enables users to handle error scenarios appropriately.
FerroEduardo commented 5 months ago

I'm facing a similar problem when using the elasticsearch output, which I cannot handle the connection error and the pipeline gets stuck in a loop. The code below never reaches the stdout output:

input:
  generate:
    mapping: 'root = {"hello": "world"}'
    interval: 1s
    count: 1

pipeline:
  processors:
    - log:
        level: INFO
        message: "processing event: ${!content()}"

output:
  broker:
    pattern: fan_out_sequential_fail_fast
    outputs:
      - elasticsearch:
          urls:
            - https://localhost:1234
          index: "my-index"
          id: ${!timestamp_unix()}
          max_retries: 1
          tls:
            enabled: true
            skip_cert_verify: true
          basic_auth:
            enabled: true
            username: elastic
            password: admin
          healthcheck: false
          sniff: false
        processors:
          - mapping: '{"message": "elasticsearch preprocessor", "timestamp": timestamp_unix()}'
      - stdout:
          codec: lines
        processors:
          - mapping: '{"message": "stdout preprocessor", "timestamp": timestamp_unix()}'
Output INFO Running main config from specified file @service=benthos benthos_version=unknown path=test-elastic.yaml INFO Listening for HTTP requests at: http://0.0.0.0:4195 @service=benthos INFO Input type generate is now active @service=benthos label="" path=root.input INFO Output type elasticsearch is now active @service=benthos label="" path=root.output.broker.outputs.0 INFO Launching a benthos instance, use CTRL+C to close @service=benthos INFO Output type stdout is now active @service=benthos label="" path=root.output.broker.outputs.1 INFO processing event: {"hello":"world"} @service=benthos label="" path=root.pipeline.processors.0 ERRO Failed to send message to elasticsearch: Post "https://localhost:1234/_bulk": dial tcp 127.0.0.1:1234: connect: connection refused @service=benthos label="" path=root.output.broker.outputs.0 INFO processing event: {"hello":"world"} @service=benthos label="" path=root.pipeline.processors.0 ERRO Failed to send message to elasticsearch: no available connection: no Elasticsearch node available @service=benthos label="" path=root.output.broker.outputs.0 INFO processing event: {"hello":"world"} @service=benthos label="" path=root.pipeline.processors.0 ERRO Failed to send message to elasticsearch: Post "https://localhost:1234/_bulk": dial tcp 127.0.0.1:1234: connect: connection refused @service=benthos label="" path=root.output.broker.outputs.0 INFO processing event: {"hello":"world"} @service=benthos label="" path=root.pipeline.processors.0 ERRO Failed to send message to elasticsearch: no available connection: no Elasticsearch node available @service=benthos label="" path=root.output.broker.outputs.0 INFO processing event: {"hello":"world"} @service=benthos label="" path=root.pipeline.processors.0

As the problem seems related to the BatchOutput interface, other components, such as aws_s3, behave in the same way.

Jeffail commented 5 months ago

@mohitmarwal when you say "previously", do you mean version 4.26.0? And is there a an example you can provide where this behaviour shows up? For example, the following config behaves as I'd expect:

input:
  http_server: {}

output:
  http_client:
    url: example.com/not/going/to/work

Running curl http://localhost:4195/post -d "hello world" returns an error as expected.

@FerroEduardo this is a separate issue, unfortunately for some components when they are failing during a connection loop they will block traffic even when a DLQ is configured. There's an existing issue for this: https://github.com/benthosdev/benthos/issues/1210

mohitmarwal commented 4 months ago

@Jeffail Issue occured when i updated from version V 4.24.0 and 4.25.1.

mohitmarwal commented 4 months ago

@Jeffail l here is the suedo code where the issue occurs type myBatchOutput struct { count int // Just one simple field }

func (m *myBatchOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { // Implement your WriteBatch method here for _, msg := range batch { // Process each message fmt.Println("Processing message:", msg) } return nil }

func (m *myBatchOutput) Close(ctx context.Context) error { fmt.Println("disconnected:") return nil }

func (m *myBatchOutput) Connect(ctx context.Context) error {

// Implement your Connect method here
return nil

}

func main() { // Initialize your BatchOutput implementation and use it as needed err := service.RegisterBatchOutput("my_batch_output", service.NewConfigSpec(), newMyBatchOutput) if err != nil { panic(err) } service.RunCLI(context.Background()) }

func newMyBatchOutput(conf service.ParsedConfig, mgr service.Resources) ( output service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error, ) {

output = &myBatchOutput{
    count: 10, // Set a default value or configure from conf
}
// Assign a default max in flight
maxInFlight = 10
fmt.Println("newbatchoutputfunc:")
return 

}

input: type: http_server address: ":80" path: "/test" http_server: path: "/test" allowed_verbs:

pipeline: processors: []

output: type: plugin plugin: name: your_custom_plugin_name

output: type: stdout

mohitmarwal commented 4 months ago

@Jeffail in my setup even this doesnt return the hello world but gives me request time out error

input: http_server: { path: /test } output: http_client: url: https://httpbin.org/hidden-basic-auth/:user/:passwd

MINGW64 ~/Downloads/benthos_4.27.0_windows_amd64.tar/benthos_4.27.0_windows_amd64 $ curl -X POST http://localhost:4195/test -d "hello world" Request timed out

mohitmarwal commented 4 months ago

@Jeffail any updates on this please let me know if you want more

mohitmarwal commented 3 months ago

@Jeffail any updates on this , or are you planning to fix this in near future ?

mihaitodor commented 3 months ago

@mohitmarwal I'm really, really struggling to reproduce the issue you're seeing here. Can you please share a self-contained step-by-step example (with proper code blocks please) of the issue and which version works and which doesn't? Please make sure you share the full code of the custom plugin so I can just copy paste it, run go build and then run it myself locally. Don't use services like https://httpbin.org (or if you do, please make sure we have access to them).

mohitmarwal commented 1 month ago

hi @mihaitodor we can use services which are opensource https://httpbin.org/ as we cannot expose organization api in public. this perfectly works and accessible everywhere

follow these steps to reproduce 1) create a config.yaml file add these content

input: http_server: { path: /test } output: http_client: url: https://httpbin.org/hidden-basic-auth/:user/:passwd

2) now run it in benthos using ./benthos -c config.yaml

3) open any power-shell and run the command curl -X POST http://localhost:4195/test -d "hello world"

4) you will get output Request timed out instead of expected https://httpbin.org/hidden-basic-auth/:user/:passwd: HTTP request returned unexpected response code (405): 405 Method Not Allowed, Error: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">405 Method Not Allowed

Method Not Allowed

The method is not allowed for the requested URL.

Please try this and let me know