redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.13k stars 834 forks source link

Setting a batching period policy blocks graceful termination #1001

Open disintegrator opened 2 years ago

disintegrator commented 2 years ago

It appears that configuring batch with a period policy blocks graceful termination of a Benthos process. Configuration and sample output shown below.

When I remove the period policy and only have batch_size, I am able to safely terminate the benthos process and observe the pending batch is flushed correctly.


Config

metrics:
  prometheus:
    prefix: benthos
    path_mapping: ""

input:
  file:
    paths: [${DATA_PATH}/input/pypi/*.jsonl]
    codec: lines
  processors:
    - label: json_to_csv
      bloblang: |
        map escape_csv {
          root = if this.re_match("[\"\n,]+") {
            "\"" + this.replace("\"", "\"\"") + "\""
          } else {
            this
          }
        }

        meta = deleted()

        root = [
          (this.timestamp | "").string().apply("escape_csv"),
          (this.project | "").string().apply("escape_csv"),
          (this.url | "").string().apply("escape_csv"),
          (this.details.installer.name | "").string().apply("escape_csv"),
          (this.details.installer.version | "").string().apply("escape_csv"),
          (this.details.implementation.name | "").string().apply("escape_csv"),
          (this.details.implementation.version | "").string().apply("escape_csv"),
          (this.details.system.name | "").string().apply("escape_csv"),
          (this.details.system.release | "").string().apply("escape_csv")
        ].join(",")
output:
  broker:
    outputs:
      - drop: {}
        processors:
          - log:
              message: "dropping a batch"
    batching:
      period: 1m
      byte_size: 20_000_000
      processors:
        - insert_part:
            index: 0
            content: "timestamp,project,url,installer_name,installer_version,implementation_name,implementation_version,system_name,system_release"
        - archive:
            format: lines
        - bloblang: |
            meta batch_hash = content().hash("xxhash64").encode("hex")

Actual output

{"@timestamp":"2021-12-09T16:50:58Z","@service":"benthos","component":"benthos.input","level":"INFO","message":"Consuming from file '/data/input/pypi/pypi_results_000000000000.jsonl'"}
{"@timestamp":"2021-12-09T16:50:58Z","@service":"benthos","component":"benthos.output","level":"INFO","message":"Dropping messages."}
{"@timestamp":"2021-12-09T16:50:58Z","@service":"benthos","component":"benthos","level":"INFO","message":"Launching a benthos instance, use CTRL+C to close."}
{"@timestamp":"2021-12-09T16:50:58Z","@service":"benthos","component":"benthos","level":"INFO","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
{"@timestamp":"2021-12-09T16:51:03Z","@service":"benthos","component":"benthos.output.processor.0","level":"INFO","message":"dropping a batch"}
^C{"@timestamp":"2021-12-09T16:51:05Z","@service":"benthos","component":"benthos","level":"INFO","message":"Received SIGTERM, the service is closing."}
{"@timestamp":"2021-12-09T16:51:20Z","@service":"benthos","component":"benthos","level":"INFO","message":"Unable to fully drain buffered messages within target time."}
{"@timestamp":"2021-12-09T16:51:25Z","@service":"benthos","component":"benthos","level":"ERROR","message":"Failed to stop stream gracefully within target time."}

Expected output

(observed when I remove the period policy)

{"@timestamp":"2021-12-09T16:54:50Z","@service":"benthos","component":"benthos.input","level":"INFO","message":"Consuming from file '/data/input/pypi/pypi_results_000000000000.jsonl'"}
{"@timestamp":"2021-12-09T16:54:50Z","@service":"benthos","component":"benthos.output","level":"INFO","message":"Dropping messages."}
{"@timestamp":"2021-12-09T16:54:50Z","@service":"benthos","component":"benthos","level":"INFO","message":"Launching a benthos instance, use CTRL+C to close."}
{"@timestamp":"2021-12-09T16:54:50Z","@service":"benthos","component":"benthos","level":"INFO","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
{"@timestamp":"2021-12-09T16:54:55Z","@service":"benthos","component":"benthos.output.processor.0","level":"INFO","message":"dropping a batch"}
^C{"@timestamp":"2021-12-09T16:54:56Z","@service":"benthos","component":"benthos","level":"INFO","message":"Received SIGTERM, the service is closing."}
{"@timestamp":"2021-12-09T16:54:56Z","@service":"benthos","component":"benthos.output.processor.0","level":"INFO","message":"dropping a batch"}
Jeffail commented 2 years ago

This is an interesting one as it's intentional: https://github.com/Jeffail/benthos/blob/master/lib/output/batcher.go#L105, but also could be changed if we decide the behaviour isn't useful.

When you shut benthos down with a sigterm it begins a graceful termination, where we'd like to stop but we're happy to wait for a short period of time in order to flush pending messages, commit pending acks, and make sure we have our affairs in order before we stop.

However, a timer also sets off (configurable with the field shutdown_timeout), at which point all components are given the signal to terminate immediately, where they are expected to close connections even if there are pending messages/acks, with the accepted trade off that duplicates might occur the next time we start.

When you configure a batching policy without a period and we enter graceful termination then we flush the partial batch because without a period there is no remaining batching condition that might trigger (the input should also have stopped consuming).

However, when a period is specified then there is some mechanism for finishing the batch as configured even though we aren't expecting new messages. If the batching period is significantly less than the shutdown timeout then I think it's fair to wait as failing to do so could have an impact on downstream services. If the batching period is higher than the shutdown timeout then there's no point blocking, we should either be dropping the batch or flushing it immediately.

There's no way for the batcher to actually know how long the timeout is right now without it being triggered, so we can't do the smart thing. I think in hindsight it's probably worth flushing immediately as that's quite intuitive and unlikely to harm downstream components in practice.

disintegrator commented 2 years ago

@Jeffail

I think in hindsight it's probably worth flushing immediately as that's quite intuitive and unlikely to harm downstream components in practice.

That was my intuition as well. I think it's possible to advise users to raise the shutdown timeout to exceed any period policies in effect but that will now likely cause issues with respect to the process supervisor e.g. k8s/containerd/... and not great for CI/CD to hold deployments beyond 60s while waiting on a process.

For my use case, I don't think I need anything over 30s and I caught this issue because I'm tuning a lot of things as part of testing the framework.

gesixin commented 2 years ago

@Jeffail file:output/batcher.go ,channel:m.shutSig.CloseNowChan is used many times ,but no where to close it,Sometimes it will cause the process to get stuck

Jeffail commented 2 years ago

Thanks @gesixin! Did another tidy up which I think should fix it: https://github.com/Jeffail/benthos/commit/933ff6679b352dd926607c58f4c817280d023331

sasanrose commented 1 year ago

Hey @Jeffail I think this still happens when you wrap your outputs in a switch output. So when a SIGTERM/SIGINIT signal is received the transactions channel in the switch is closed and as a result the loop stops and the defer func gets executed (https://github.com/benthosdev/benthos/blob/main/internal/impl/pure/output_switch.go#L374). However, it waits for all the pending acks to be resolved by the target output and then sends shutdown signal to the target outputs. But if the target outputs has a period they won't flush until they receive the shutdown signal. As a result this gets blocked until the shutdown timeout happens. The only way I can see for the ackPending check in the switch to break is for someone to call the TriggerCloseNow of the switch itself which apparently not happening at all when switch is the top layer output.