warpstreamlabs / bento

Fancy stream processing made operationally mundane. This repository is a fork of the original project before the license was changed.
https://warpstreamlabs.github.io/bento/
Other
934 stars 51 forks source link

Auto Retry Toggle #91

Open JuchangGit opened 1 month ago

JuchangGit commented 1 month ago

能否添加自动重试的开关或者控制方式,因为bento目前的工作方式是输入和输出错误发生错误时会一直重试下去,这对于有些场景是不适合的。 比如:输入为数据库时,上游如果改变了表结构,那么bento将不断重试,上游可能认为这是恶意攻击

能否添加控制重试的次数或是否自动重试的开关来控制bento的默认行为

jem-davies commented 1 month ago

Translation: (from google but also checked by a native speaker)

Can you add a switch or control method for automatic retry? Because the current working method of Bento is that it will always retry when an input or output error occurs, which is not suitable for some scenarios.

For example: when the input is a database, if the upstream changes the table structure, Bento will continue to retry, and the upstream may think this is a malicious attack.

Can I add a switch that controls the number of retries or whether to automatically retry to control the default behavior of bento?

jem-davies commented 1 month ago

你可以使用 streams mode 来解决这个问题。这种方法可以使用一个/ready endpoint 来报错没有被连接的 streams(bento configs)。然后你可以对他们执行一个CRUD操作来删除那些报错的stream。

这里我有一个 stream (config.yaml) 可以通过endpoint来检查并且删除那些连接失败的stream。

It is possible to use streams mode and what this allows you to do is poll a /ready endpoint that will return streams (bento configs) that aren't connected. You can then perform CRUD operations on the streams so you could then delete the offending streams.

Here I have a stream (config.yaml) that polls the endpoint and will delete the streams if they are reporting not ready:

sql_test
├── resources.yaml
└── streams
    ├── config.yaml
    ├── sql.yaml
    └── sql_2.yaml

./resources.yaml

rate_limit_resources:
  - label: ready_rate_limit
    local:
      count: 1
      interval: 2s

./streams/config.yaml

input:
  http_client:
    url: "http://0.0.0.0:4195/ready"
    verb: GET
    timeout: 5s
    successful_on: [503]
    rate_limit: ready_rate_limit

pipeline:
  processors:
      - switch: 
          - check: content().string() == "OK"
            processors:
              - mapping: |
                  root = deleted()

      - mapping: |
          root = {"response": content().string()}

      - mapping: |
          root.streams = this.response.replace("streams ", "").replace(" are not connected\n", "").split(", ")

      - branch:
          processors:
              - http:
                  url: "http://0.0.0.0:4195/streams/${! streams.0 }"
                  verb: DELETE
                  timeout: 5s

      - mapping: |
          root = {"deleted_stream":streams.0}

output:
  stdout: {}

./streams/sql.yaml

input:
  sql_select:
    driver: postgres
    dsn: postgres://admin:admin@0.0.0.0:5432/test_bento?sslmode=disable
    table: people_copy
    columns: ['first_name', 'last_name', 'age']

output:
  stdout: {}

./streams/sql_2.yaml (same as sql.yaml)

bento:

bento -r resources.yaml streams ./streams/*.yaml 

Output:

INFO Running without a main config file            @service=bento bento_version=""
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=bento
INFO Output type stdout is now active              @service=bento label="" path=root.output stream=sql_2
INFO Input type http_client is now active          @service=bento label="" path=root.input stream=config
INFO Output type stdout is now active              @service=bento label="" path=root.output stream=sql
INFO Output type stdout is now active              @service=bento label="" path=root.output stream=config
INFO Launching Bento in streams mode, use CTRL+C to close  @service=bento
ERRO Failed to connect to sql_select: pq: column "first_name" does not exist  @service=bento label="" path=root.input stream=sql_2
{"deleted_stream":"sql"}
ERRO Failed to connect to sql_select: pq: column "first_name" does not exist  @service=bento label="" path=root.input stream=sql_2
ERRO Failed to connect to sql_select: pq: column "first_name" does not exist  @service=bento label="" path=root.input stream=sql_2
{"deleted_stream":"sql_2"}
JuchangGit commented 1 month ago

这确实是一种可行的方案,但我要的不是这种。 我的用法是让bento作为执行器,调度程序调用bento来处理数据,调度器只需调起和检测bento的退出状态来确定接下来的任务怎么执行。 像下面这样: a→b→c a,b,c都是bento的实例,a失败则b,c都不执行。 我要的是bento只要出错,不论是input、pipeline或output,bento的退出状态码都是非0 。

不知道我表达的是否清楚

jem-davies commented 4 weeks ago

https://github.com/warpstreamlabs/bento/issues/92

Added this ticket to consider adding the option to allow such a thing, however pretty sure that the original project that this was forked from never intended to enable such a thing.

That it is by design that bento shouldn't every stop trying to connect to an input.

JuchangGit commented 3 weeks ago

是否可以为input和output提供一个配置项——最大重试次数 max_retry_num ,默认值为 -1 表示一直重试(和现在的机制一样), 让用户可以控制重试的次数。配置像下面这样:

input:
  max_retry_num: 2
  stdin:
    scanner:
      lines: {}
    auto_replay_nacks: true
buffer:
  none: {}
pipeline:
  threads: -1
  processors: []
output:
  max_retry_num: 3
  stdout:
    codec: lines
JuchangGit commented 2 weeks ago

https://github.com/warpstreamlabs/bento/issues/91#issuecomment-2282210672 可能有个bug,当把./streams/sql_2.yaml里面的数据库连接串写错,ip或者端口port写错, /ready 的返回值是 ok