housepower / clickhouse_sinker

Easily load data from kafka to ClickHouse
https://housepower.github.io/clickhouse_sinker
Apache License 2.0
515 stars 118 forks source link

High memory usage (how to cut it down) #139

Closed josepowera closed 2 years ago

josepowera commented 2 years ago

Using clickhouse_sinker 2.2 / kafka 2.8/ new kafka driver with sample record coming from kafka in CSV format (approx 63bytes/record): 2,0,0,0,11,0,0,1638280052738,1,,0,1462500355397980160,0,0,,0,,,

We have millions of records in kafka to process. However when running this in clickhouse_sinker we need 2.3 GB in order to process this records.

It's not that this process is leaking memory, we just want to cut down memory usage (we have cold-start problem) with memory when we must restart server with 23 standalone tasks similar to this.

Is there any special setting that could help in this situation, like setting some queue to lower value or similar (I believe record above is small and also our setting "bufferSize": 10000 and "flushInterval": 5 are not excesive). We have a problem only when we have milions of records in the kafka backlog, and it looks like that clickhouse_sinker is fetching too much/too fast from kafka.

{
  "clickhouse": {
    "hosts": [
      [
        "clickhouse-pv-simple-19000.test-clickhouse-operator.svc.cluster.local"
      ]
    ],
    "port": 19000,
    "db": "test5dsp",
    "username": "",
    "password": "",
    "retryTimes": 0
  },
  "kafka": {
    "brokers": "kafkaoss-kafka-bootstrap.kafka.svc.cluster.local:9092",
    "version": "2.8.0"
  },
  "task": {
    "name": "csv_events_raw",
    "kafka": "kfk1",
    "topic": "dspshopeventsraw",
    "earliest": true,
    "consumerGroup": "chrawctb6",
    "parser": "csv",
    "csvFormat": [
      "tenantid",
      "advertiserid",
      "creativeid",
      "campaignid",
      "shopid",
      "categoryid",
      "productid",
      "eventtime",
      "shopeventtype",
      "auctionid",
      "impressionid",
      "userid",
      "recommendationreason",
      "recommendationreasonsource",
      "uemh",
      "productprice",
      "crtype",
      "mediatype",     
      "geocountry3"      
    ],
    "delimiter": ",",
    "clickhouse": "ch1dsp",
    "tableName": "dspshop_events_raw_raw",
    "@desc_of_autoSchema": "auto schema will auto fetch the schema from clickhouse",
    "autoSchema": true,
    "@desc_of_exclude_columns": "this columns will be excluded by insert SQL",
    "excludeColumns": [
      "eventDate",
      "eventTimeStamp"
    ],
    "bufferSize": 10000,
    "flushInterval": 5
  },
  "logLevel": "info"
}
go tool pprof --inuse_objects http://localhost:2120/debug/pprof/heap
Fetching profile over HTTP from http://localhost:2120/debug/pprof/heap
Saved profile in /home/uporabnik/pprof/pprof.clickhouse_sinker.alloc_objects.alloc_space.inuse_objects.inuse_space.001.pb.gz
File: clickhouse_sinker
Type: inuse_objects
Time: Dec 5, 2021 at 11:47pm (UTC)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 2217288, 95.80% of 2314588 total
Dropped 78 nodes (cum <= 11572)
Showing top 10 nodes out of 35
      flat  flat%   sum%        cum   cum%
   1325465 57.27% 57.27%    1325465 57.27%  github.com/twmb/franz-go/pkg/kgo.recordToRecord
    379789 16.41% 73.67%     379789 16.41%  encoding/csv.(*Reader).readRecord
    163842  7.08% 80.75%     163842  7.08%  github.com/housepower/clickhouse_sinker/parser.(*CsvMetric).GetInt
     65537  2.83% 83.58%      65537  2.83%  database/sql.(*Tx).grabConn
     65537  2.83% 86.42%      65537  2.83%  github.com/housepower/clickhouse_sinker/parser.(*CsvMetric).GetString
     53274  2.30% 88.72%     326344 14.10%  github.com/housepower/clickhouse_sinker/model.MetricToRow
     49152  2.12% 90.84%      49153  2.12%  github.com/housepower/clickhouse_sinker/task.(*Service).put
     43691  1.89% 92.73%      43691  1.89%  github.com/housepower/clickhouse_sinker/model.GetRow
     38232  1.65% 94.38%      87385  3.78%  github.com/housepower/clickhouse_sinker/input.(*KafkaFranz).Run.func2
     32769  1.42% 95.80%     439350 18.98%  github.com/housepower/clickhouse_sinker/parser.(*CsvParser).Parse
(pprof) list initMappings
Total: 2314588
# HELP clickhouse_sinker_consume_msgs_total total num of consumed msgs
# TYPE clickhouse_sinker_consume_msgs_total counter
clickhouse_sinker_consume_msgs_total{task="csv_dsp_dspshop_events_raw"} 4.6138709e+07
# HELP clickhouse_sinker_consume_offsets last committed offset for each topic partition pair
# TYPE clickhouse_sinker_consume_offsets gauge
clickhouse_sinker_consume_offsets{partition="0",task="csv_dsp_dspshop_events_raw",topic="dspshopeventsraw"} 4.28276248e+08
clickhouse_sinker_consume_offsets{partition="1",task="csv_dsp_dspshop_events_raw",topic="dspshopeventsraw"} 4.33094655e+08
# HELP clickhouse_sinker_flush_msgs_total total num of flushed msgs
# TYPE clickhouse_sinker_flush_msgs_total counter
clickhouse_sinker_flush_msgs_total{task="csv_dsp_dspshop_events_raw"} 4.6060378e+07
# HELP clickhouse_sinker_parsing_pool_backlog GlobalParsingPool backlog
# TYPE clickhouse_sinker_parsing_pool_backlog gauge
clickhouse_sinker_parsing_pool_backlog{task="csv_dsp_dspshop_events_raw"} 15744
# HELP clickhouse_sinker_ring_msgs num of msgs in ring
# TYPE clickhouse_sinker_ring_msgs gauge
clickhouse_sinker_ring_msgs{task="csv_dsp_dspshop_events_raw"} 29179
# HELP clickhouse_sinker_ring_normal_batchs_total total num of normal batches generated
# TYPE clickhouse_sinker_ring_normal_batchs_total counter
clickhouse_sinker_ring_normal_batchs_total{task="csv_dsp_dspshop_events_raw"} 2879
# HELP clickhouse_sinker_writing_pool_backlog GlobalWritingPool backlog
# TYPE clickhouse_sinker_writing_pool_backlog gauge
clickhouse_sinker_writing_pool_backlog{task="csv_dsp_dspshop_events_raw"} 3
# HELP go_build_info Build information about the main Go module.
# TYPE go_build_info gauge
go_build_info{checksum="",path="github.com/housepower/clickhouse_sinker",version="(devel)"} 1
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000127051
go_gc_duration_seconds{quantile="0.25"} 0.000383059
go_gc_duration_seconds{quantile="0.5"} 0.000847179
go_gc_duration_seconds{quantile="0.75"} 0.003243602
go_gc_duration_seconds{quantile="1"} 0.441334766
go_gc_duration_seconds_sum 6.032537151
go_gc_duration_seconds_count 468
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 37
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.17.3"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 1.421666816e+09
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 4.25391505824e+11
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.570621e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 2.04069261e+09
# HELP go_memstats_gc_cpu_fraction The fraction of this program's available CPU time used by the GC since the program started.
# TYPE go_memstats_gc_cpu_fraction gauge
go_memstats_gc_cpu_fraction 0.09415253920496372
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 9.7379984e+07
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 1.421666816e+09
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 9.12883712e+08
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 1.468399616e+09
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 6.023778e+06
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 2.20250112e+08
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 2.381283328e+09
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 1.6387489131151752e+09
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 2.046716388e+09
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 6000
# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
# TYPE go_memstats_mcache_sys_bytes gauge
go_memstats_mcache_sys_bytes 16384
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 1.7415344e+07
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 3.137536e+07
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 1.9475288e+09
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 4.820459e+06
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 1.081344e+06
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 1.081344e+06
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 2.51752748e+09
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 13
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 1878.43
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 1.048576e+06
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 15
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 2.304339968e+09
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.63874716946e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 3.22105344e+09
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes 1.8446744073709552e+19
# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served.
# TYPE promhttp_metric_handler_requests_in_flight gauge
promhttp_metric_handler_requests_in_flight 1
# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
# TYPE promhttp_metric_handler_requests_total counter
promhttp_metric_handler_requests_total{code="200"} 469
promhttp_metric_handler_requests_total{code="500"} 0
promhttp_metric_handler_requests_total{code="503"} 0
josepowera commented 2 years ago

Ok, i think I might have found a solution.... Could this two variables be exposed to config..

kafka_franz.go ` kgo.FetchMaxBytes(1 << 27), //134 MB kgo.BrokerMaxReadBytes(1 << 27), //134 MB

`

from franz-go config: // FetchMaxBytes sets the maximum amount of bytes a broker will try to send // during a fetch, overriding the default 50MiB. Note that brokers may not obey // this limit if it has records larger than this limit. Also note that this // client sends a fetch to each broker concurrently, meaning the client will // buffer up to <brokers * max bytes> worth of memory. // // This corresponds to the Java fetch.max.bytes setting.

/ BrokerMaxReadBytes sets the maximum response size that can be read from // Kafka, overriding the default 100MiB. // // This is a safety measure to avoid OOMing on invalid responses. This is // slightly double FetchMaxBytes; if bumping that, consider bump this. No other // response should run the risk of hitting this limit.