apache / incubator-streampark

Make stream processing easier! Easy-to-use streaming application development framework and operation platform.
https://streampark.apache.org/
Apache License 2.0
3.89k stars 1.01k forks source link

[Proposal] Optimize the configuration inside flink-application.conf #1739

Closed 1996fanrui closed 2 years ago

1996fanrui commented 2 years ago

Code of Conduct

Search before asking

Describe the proposal

mail list: https://lists.apache.org/thread/w0lj2zmngt82j38gzgoyztkwyox9oy3o

StreamPark defines many configurations that have the same meaning as Flink configurations, but with different configuration names.

For example, Flink has the pipeline.auto-watermark-interval to configure the interval of the automatic watermark emission. But StreamPark defined a new configuration is flink.watermark.interval.

Flink 1.12 configuration doc

Flink 1.15 configuration doc

It has some problems:

  1. The configuration is repeated, which brings learning costs to users.
  2. Every time a new Flink version is released, StreamPark needs to be compatible with this configuration, which will bring a lot of repetitive code.

I propose to use Flink's official configuration to avoid these problems. It will bring some benefits:

Task list

Old configuration

flink:
  deployment:
    property: 
      $internal.application.main:
      pipeline.name:
      yarn.application.queue:
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 2
      jobmanager.memory:
        flink.size:
        heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        off-heap.size:
        process.size:
      taskmanager.memory:
        flink.size:
        framework.heap.size:
        framework.off-heap.size:
        managed.size:
        process.size:
        task.heap.size:
        task.off-heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        jvm-overhead.min:
        managed.fraction: 0.4
  checkpoints:
    enable: true
    interval: 30000
    mode: EXACTLY_ONCE
    timeout: 300000
    unaligned: true
  watermark:
    interval: 10000
  # state backend
  state:
    backend: # see https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
      value: filesystem # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
      memory: 5242880 # Effective for jobmanager, maximum memory
      async: false    # Valid for (jobmanager, filesystem), whether to enable asynchronous
      incremental: true # Valid for rocksdb, whether to enable incremental
      # Configuration reference of rocksdb https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
      # Remove the prefix of rocksdb configuration key: state.backend
      #rocksdb.block.blocksize:
    checkpoint-storage: filesystem # Special note: This parameter is only valid in flink 1.12+, and the state.backend.value is hashmap, optional: (jobmanager | filesystem)
    checkpoints.dir: file:///tmp/chkdir
    savepoints.dir: file:///tmp/chkdir
    checkpoints.num-retained: 1
  # restart strategy
  restart-strategy:
    value: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
    fixed-delay:
      attempts: 3
      delay: 5000
    failure-rate:
      max-failures-per-interval:
      failure-rate-interval:
      delay:
  # table
  table:
    planner: blink # (blink|old|any)
    mode: streaming #(batch|streaming)

New configuration

flink:
  deployment:
    property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
      $internal.application.main:
      pipeline.name:
      yarn.application.queue:
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 2
      jobmanager.memory:
        flink.size:
        heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        off-heap.size:
        process.size:
      taskmanager.memory:
        flink.size:
        framework.heap.size:
        framework.off-heap.size:
        managed.size:
        process.size:
        task.heap.size:
        task.off-heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        jvm-overhead.min:
        managed.fraction: 0.4
      pipeline:
        auto-watermark-interval: 200ms
      # checkpoint
      execution:
        checkpointing:
          mode: EXACTLY_ONCE
          interval: 30s
          timeout: 10min
          unaligned: false
          externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
      # state backend
      state:
        backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
        backend.incremental: true
        checkpoint-storage: filesystem
        savepoints.dir: file:///tmp/chkdir
        checkpoints.dir: file:///tmp/chkdir
      # restart strategy
      restart-strategy: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
      restart-strategy.fixed-delay:
        attempts: 3
        delay: 5000
      restart-strategy.failure-rate:
        max-failures-per-interval:
        failure-rate-interval:
        delay:
  # table
  table:
    planner: blink # (blink|old|any)
    mode: streaming #(batch|streaming)

Are you willing to submit PR?

wolfboys commented 2 years ago

hi 1996fanrui:

looks good, Can you describe the format of the configuration file before and after the change completely? You'd better discuss this problem in the email.

e.g:

before:

flink:
  deployment:
    property: 
      $internal.application.main:
      pipeline.name:
      yarn.application.queue:
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 2
      jobmanager.memory:
        flink.size:
        heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        off-heap.size:
        process.size:
      taskmanager.memory:
        flink.size:
        framework.heap.size:
        framework.off-heap.size:
        managed.size:
        process.size:
        task.heap.size:
        task.off-heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        jvm-overhead.min:
        managed.fraction: 0.4
  checkpoints:
    enable: true
    interval: 30000
    mode: EXACTLY_ONCE
    timeout: 300000
    unaligned: true
  watermark:
    interval: 10000
  # state backend
  state:
    backend: # see https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
      value: filesystem # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
      memory: 5242880 # Effective for jobmanager, maximum memory
      async: false    # Valid for (jobmanager, filesystem), whether to enable asynchronous
      incremental: true # Valid for rocksdb, whether to enable incremental
      # Configuration reference of rocksdb https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
      # Remove the prefix of rocksdb configuration key: state.backend
      #rocksdb.block.blocksize:
    checkpoint-storage: filesystem # Special note: This parameter is only valid in flink 1.12+, and the state.backend.value is hashmap, optional: (jobmanager | filesystem)
    checkpoints.dir: file:///tmp/chkdir
    savepoints.dir: file:///tmp/chkdir
    checkpoints.num-retained: 1
  # restart strategy
  restart-strategy:
    value: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
    fixed-delay:
      attempts: 3
      delay: 5000
    failure-rate:
      max-failures-per-interval:
      failure-rate-interval:
      delay:
  # table
  table:
    planner: blink # (blink|old|any)
    mode: streaming #(batch|streaming)

after:

flink:
  deployment:
    property: 
      $internal.application.main:
      pipeline.name:
      yarn.application.queue:
      taskmanager.numberOfTaskSlots: 1
      parallelism.default: 2
      jobmanager.memory:
        flink.size:
        heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        off-heap.size:
        process.size:
      taskmanager.memory:
        flink.size:
        framework.heap.size:
        framework.off-heap.size:
        managed.size:
        process.size:
        task.heap.size:
        task.off-heap.size:
        jvm-metaspace.size:
        jvm-overhead.max:
        jvm-overhead.min:
        managed.fraction: 0.4
  checkpoints:
    enable: true
    interval: 30000
    mode: EXACTLY_ONCE
    timeout: 300000
    unaligned: true
  # state backend
  state:
    backend: # see https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html
      value: filesystem # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
      memory: 5242880 # Effective for jobmanager, maximum memory
      async: false    # Valid for (jobmanager, filesystem), whether to enable asynchronous
      incremental: true # Valid for rocksdb, whether to enable incremental
      # Configuration reference of rocksdb https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#rocksdb-state-backend
      # Remove the prefix of rocksdb configuration key: state.backend
      #rocksdb.block.blocksize:
    checkpoint-storage: filesystem # optional: (jobmanager | filesystem)
    checkpoints.dir: file:///tmp/chkdir
    savepoints.dir: file:///tmp/chkdir
    checkpoints.num-retained: 1
  # restart strategy
  restart-strategy:
    value: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
    fixed-delay:
      attempts: 3
      delay: 5000
    failure-rate:
      max-failures-per-interval:
      failure-rate-interval:
      delay:
  # table
  table:
    planner: blink # (blink|old|any)
    mode: streaming #(batch|streaming)
pipeline:
  auto-watermark-interval: 200ms

diff desc: ......

why change:

...

1996fanrui commented 2 years ago

looks good, Can you describe the format of the configuration file before and after the change completely? You'd better discuss this problem in the email.

Hi @wolfboys , thanks for your good suggestions. I have raised a mail discussion here : https://lists.apache.org/thread/w0lj2zmngt82j38gzgoyztkwyox9oy3o

If you have any good ideas, please share them on the mailing list, thank you~

wolfboys commented 2 years ago

looks good, Can you describe the format of the configuration file before and after the change completely? You'd better discuss this problem in the email.

Hi @wolfboys , thanks for your good suggestions. I have raised a mail discussion here : https://lists.apache.org/thread/w0lj2zmngt82j38gzgoyztkwyox9oy3o

If you have any good ideas, please share them on the mailing list, thank you~

good job. 👍

1996fanrui commented 2 years ago

All tasks have been merged, thanks @wolfboys 's suggestions and review. I will close this issue.