redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 814 forks source link

Memeory not be released after all stream was finished #2771

Open fearfate opened 1 month ago

fearfate commented 1 month ago

I used connect to sync a table of bigquery which has 50GB storage usage, tring to do a batch synchronization. but i found that after the task is completed, the memory usage remains unchanged. I am confused about this, could someone tell me how to deal with this, thanks!

I runned this in stream mode

My stream config:

input:
  label: "bigquery_package_versions"
  broker:
    inputs:
    - gcp_bigquery_select:
        project: example
        table: bigquery-public-data.deps_dev_v1.PackageVersions
        columns:
        - SnapshotAt
        - System
        - Name
        - Version
        - Licenses
        - Links
        - Advisories
        - VersionInfo
        - Hashes
        - DependenciesProcessed
        - DependencyError
        - UpstreamPublishedAt
        - Registries
        - SLSAProvenance
        - UpstreamIdentifiers
        where:  SnapshotAt >= ? AND SnapshotAt < ? # No default (optional)
        auto_replay_nacks: true
        # job_labels: {}
        # priority: ""
        args_mapping: |
          root = ["2024-07-29", "2024-08-08"]
        # prefix: "" # No default (optional)
        # 87212989
        suffix: |
          LIMIT 15000000 OFFSET 0
    batching:
      byte_size: 268435456
      period: 10m
      processors:
      - mapping: |          #!blobl
          root = this
          root.SnapshotAt = root.SnapshotAt.ts_unix_nano()
          if root.exists("UpstreamPublishedAt") && root.UpstreamPublishedAt != null {
            root.UpstreamPublishedAt = root.UpstreamPublishedAt.ts_unix_nano()
          }

          meta SnapshotAt = root.SnapshotAt

pipeline:
  processors:
  - parquet_encode:
      schema:
      - name: SnapshotAt
        type: INT64
      - name: System
        type: UTF8
      - name: Name
        type: UTF8
      - name: Version
        type: UTF8
      - name: Licenses
        type: UTF8
        repeated: true
      - name: Links
        repeated: true
        fields:
          - name: Label
            type: UTF8
          - name: URL
            type: UTF8
      - name: Advisories
        repeated: true
        fields:
          - name: Source
            type: UTF8
          - name: SourceID
            type: UTF8
      - name: VersionInfo
        optional: true
        fields:
          - name: IsRelease
            type: BOOLEAN
          - name: Ordinal
            type: INT64
      - name: Hashes
        repeated: true
        fields:
          - name: Type
            type: UTF8
          - name: Hash
            type: UTF8
      - name: DependenciesProcessed
        optional: true
        type: BOOLEAN
      - name: DependencyError
        optional: true
        type: BOOLEAN
      - name: UpstreamPublishedAt
        optional: true
        type: INT64
      - name: Registries
        type: UTF8
        repeated: true
      - name: SLSAProvenance
        optional: true
        fields:
          - name: SourceRepository
            type: UTF8
          - name: Commit
            type: UTF8
          - name: URL
            type: UTF8
          - name: Verified
            type: BOOLEAN
      - name: UpstreamIdentifiers
        repeated: true
        fields:
          - name: PackageName
            optional: true
            type: UTF8
          - name: VersionString
            optional: true
            type: UTF8
          - name: Source
            type: UTF8
      default_compression: zstd

output:
  file:
    path: ${YSDB_WORKER_DATA_HOME:/opt/ysdb-worker/data}/package_versions/${! meta("SnapshotAt") }/${! timestamp_unix_nano() }.parquet # No default (required)
    codec: all-bytes

And the metrics:

# HELP batch_created Benthos Counter metric
# TYPE batch_created counter
batch_created{label="bigquery_package_versions",mechanism="check",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="count",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="period",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="size",path="root.input.batching",stream="version"} 71
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000139994
go_gc_duration_seconds{quantile="0.25"} 0.000161236
go_gc_duration_seconds{quantile="0.5"} 0.000174105
go_gc_duration_seconds{quantile="0.75"} 0.00019372
go_gc_duration_seconds{quantile="1"} 0.00047506
go_gc_duration_seconds_sum 2.914484991
go_gc_duration_seconds_count 1168
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 378676
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.22.5"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 4.017885552e+09
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 9.99175765096e+11
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.922458e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 1.1805682842e+10
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 1.1821236e+08
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 4.017885552e+09
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 8.290402304e+09
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 4.57490432e+09
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 4.3511309e+07
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 7.828144128e+09
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 1.2865306624e+10
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 1.7234258623280537e+09
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 1.1849194151e+10
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 19200
# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
# TYPE go_memstats_mcache_sys_bytes gauge
go_memstats_mcache_sys_bytes 31200
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 9.207696e+07
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 2.2848e+08
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 8.278799096e+09
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 2.129251e+07
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 7.99637504e+08
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 7.99637504e+08
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 1.4034882656e+10
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 22
# HELP input_connection_failed Benthos Counter metric
# TYPE input_connection_failed counter
input_connection_failed{label="",path="root.input.broker.inputs.0",stream="version"} 0
# HELP input_connection_lost Benthos Counter metric
# TYPE input_connection_lost counter
input_connection_lost{label="",path="root.input.broker.inputs.0",stream="version"} 0
# HELP input_connection_up Benthos Counter metric
# TYPE input_connection_up counter
input_connection_up{label="",path="root.input.broker.inputs.0",stream="version"} 2
# HELP input_latency_ns Benthos Timing metric
# TYPE input_latency_ns summary
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.5"} NaN
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.9"} NaN
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.99"} NaN
input_latency_ns_sum{label="",path="root.input.broker.inputs.0",stream="version"} 3.854610930651667e+18
input_latency_ns_count{label="",path="root.input.broker.inputs.0",stream="version"} 1.484165e+07
# HELP input_received Benthos Counter metric
# TYPE input_received counter
input_received{label="",path="root.input.broker.inputs.0",stream="version"} 1.5220244e+07
# HELP output_batch_sent Benthos Counter metric
# TYPE output_batch_sent counter
output_batch_sent{label="",path="root.output",stream="version"} 70
# HELP output_connection_failed Benthos Counter metric
# TYPE output_connection_failed counter
output_connection_failed{label="",path="root.output",stream="version"} 0
# HELP output_connection_lost Benthos Counter metric
# TYPE output_connection_lost counter
output_connection_lost{label="",path="root.output",stream="version"} 0
# HELP output_connection_up Benthos Counter metric
# TYPE output_connection_up counter
output_connection_up{label="",path="root.output",stream="version"} 1
# HELP output_error Benthos Counter metric
# TYPE output_error counter
output_error{label="",path="root.output",stream="version"} 0
# HELP output_latency_ns Benthos Timing metric
# TYPE output_latency_ns summary
output_latency_ns{label="",path="root.output",stream="version",quantile="0.5"} NaN
output_latency_ns{label="",path="root.output",stream="version",quantile="0.9"} NaN
output_latency_ns{label="",path="root.output",stream="version",quantile="0.99"} NaN
output_latency_ns_sum{label="",path="root.output",stream="version"} 5.135906716e+09
output_latency_ns_count{label="",path="root.output",stream="version"} 70
# HELP output_sent Benthos Counter metric
# TYPE output_sent counter
output_sent{label="",path="root.output",stream="version"} 70
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 14875.87
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 4096
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 13
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 1.0702270464e+10
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.72329419936e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 1.5477092352e+10
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes 1.8446744073709552e+19
# HELP processor_batch_received Benthos Counter metric
# TYPE processor_batch_received counter
processor_batch_received{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_batch_received{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_batch_sent Benthos Counter metric
# TYPE processor_batch_sent counter
processor_batch_sent{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_batch_sent{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_error Benthos Counter metric
# TYPE processor_error counter
processor_error{label="",path="root.input.batching.processors.0",stream="version"} 0
processor_error{label="",path="root.pipeline.processors.0",stream="version"} 0
# HELP processor_latency_ns Benthos Timing metric
# TYPE processor_latency_ns summary
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.5"} NaN
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.9"} NaN
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.99"} NaN
processor_latency_ns_sum{label="",path="root.input.batching.processors.0",stream="version"} 8.41241099571e+11
processor_latency_ns_count{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.5"} NaN
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.9"} NaN
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.99"} NaN
processor_latency_ns_sum{label="",path="root.pipeline.processors.0",stream="version"} 5.17977569825e+11
processor_latency_ns_count{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_received Benthos Counter metric
# TYPE processor_received counter
processor_received{label="",path="root.input.batching.processors.0",stream="version"} 1.5061893e+07
processor_received{label="",path="root.pipeline.processors.0",stream="version"} 1.484165e+07
# HELP processor_sent Benthos Counter metric
# TYPE processor_sent counter
processor_sent{label="",path="root.input.batching.processors.0",stream="version"} 1.5061893e+07
processor_sent{label="",path="root.pipeline.processors.0",stream="version"} 70

By the way, how to make the pagination loop for the input gcp_bigquery_select, with LIMIT and OFFSET , to scan all rows of the table

fearfate commented 1 month ago

the top output, ysdb-worker is the connect with some custom componement and bloblang custom functions

[root@prod-pve-worklink-yops-ysdb ~]# top
top - 01:10:18 up 5 days, 12:54,  2 users,  load average: 0.24, 0.09, 0.06
Tasks: 191 total,   1 running, 190 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.0 us,  0.0 sy,  0.0 ni,100.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 32778176 total, 12321100 free, 11014104 used,  9442972 buff/cache
KiB Swap:        0 total,        0 free,        0 used. 21336116 avail Mem

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
17910 root      20   0   14.4g  10.0g  38748 S   0.3 31.9 247:00.85 ysdb-worker
mihaitodor commented 3 weeks ago

Hey @fearfate, I had a quick look over the config, but it would be difficult to reproduce the issue you're seeing on my end. Unless the Google library used by gcp_bigquery_select does something silly, the issue should still pop up if you use a simpler input. Any chance you could try reproducing it using a file or generate input (or even a custom input which doesn't rely on 3rd party services)?

fearfate commented 1 week ago

I changed the usage and this can not be reproduced again, thanks for your replay!

Hey @fearfate, I had a quick look over the config, but it would be difficult to reproduce the issue you're seeing on my end. Unless the Google library used by gcp_bigquery_select does something silly, the issue should still pop up if you use a simpler input. Any chance you could try reproducing it using a file or generate input (or even a custom input which doesn't rely on 3rd party services)?

mihaitodor commented 2 days ago

OK, does that mean the issue has been resolved?