Open mohitmarwal opened 2 weeks ago
I'm facing a similar problem when using the elasticsearch
output, which I cannot handle the connection error and the pipeline gets stuck in a loop. The code below never reaches the stdout
output:
input:
generate:
mapping: 'root = {"hello": "world"}'
interval: 1s
count: 1
pipeline:
processors:
- log:
level: INFO
message: "processing event: ${!content()}"
output:
broker:
pattern: fan_out_sequential_fail_fast
outputs:
- elasticsearch:
urls:
- https://localhost:1234
index: "my-index"
id: ${!timestamp_unix()}
max_retries: 1
tls:
enabled: true
skip_cert_verify: true
basic_auth:
enabled: true
username: elastic
password: admin
healthcheck: false
sniff: false
processors:
- mapping: '{"message": "elasticsearch preprocessor", "timestamp": timestamp_unix()}'
- stdout:
codec: lines
processors:
- mapping: '{"message": "stdout preprocessor", "timestamp": timestamp_unix()}'
As the problem seems related to the BatchOutput
interface, other components, such as aws_s3
, behave in the same way.
@mohitmarwal when you say "previously", do you mean version 4.26.0? And is there a an example you can provide where this behaviour shows up? For example, the following config behaves as I'd expect:
input:
http_server: {}
output:
http_client:
url: example.com/not/going/to/work
Running curl http://localhost:4195/post -d "hello world"
returns an error as expected.
@FerroEduardo this is a separate issue, unfortunately for some components when they are failing during a connection loop they will block traffic even when a DLQ is configured. There's an existing issue for this: https://github.com/benthosdev/benthos/issues/1210
@Jeffail Issue occured when i updated from version V 4.24.0 and 4.25.1.
@Jeffail l here is the suedo code where the issue occurs type myBatchOutput struct { count int // Just one simple field }
func (m *myBatchOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { // Implement your WriteBatch method here for _, msg := range batch { // Process each message fmt.Println("Processing message:", msg) } return nil }
func (m *myBatchOutput) Close(ctx context.Context) error { fmt.Println("disconnected:") return nil }
func (m *myBatchOutput) Connect(ctx context.Context) error {
// Implement your Connect method here
return nil
}
func main() { // Initialize your BatchOutput implementation and use it as needed err := service.RegisterBatchOutput("my_batch_output", service.NewConfigSpec(), newMyBatchOutput) if err != nil { panic(err) } service.RunCLI(context.Background()) }
func newMyBatchOutput(conf service.ParsedConfig, mgr service.Resources) ( output service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error, ) {
output = &myBatchOutput{
count: 10, // Set a default value or configure from conf
}
// Assign a default max in flight
maxInFlight = 10
fmt.Println("newbatchoutputfunc:")
return
}
input: type: http_server address: ":80" path: "/test" http_server: path: "/test" allowed_verbs:
pipeline: processors: []
output: type: plugin plugin: name: your_custom_plugin_name
output: type: stdout
@Jeffail in my setup even this doesnt return the hello world but gives me request time out error
input: http_server: { path: /test } output: http_client: url: https://httpbin.org/hidden-basic-auth/:user/:passwd
MINGW64 ~/Downloads/benthos_4.27.0_windows_amd64.tar/benthos_4.27.0_windows_amd64 $ curl -X POST http://localhost:4195/test -d "hello world" Request timed out
Discussed in https://github.com/benthosdev/benthos/discussions/2548