vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.56k stars 1.54k forks source link

vector panics if `buffer.max_events` is 0 #11770

Closed jerome-kleinen-kbc-be closed 2 years ago

jerome-kleinen-kbc-be commented 2 years ago

A note for the community

Problem

During vector startup, vector panicks with the following message

thread 'main' panicked at 'mpsc bounded channel requires buffer > 0', /cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/sync/mpsc/bounded.rs:109:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Configuration

The config is rather straightforward: kafka and internal_logs inputs, some transforms and splunk output

Version

0.20.0 official rpm

Debug Output

I can provide this as a comment later if needed.

Example Data

N/A

Additional Context

Vector running on openshift, rhel8 base with the rpm version installed via the Dockerfile.

References

No response

jszwedko commented 2 years ago

Hi @jeromekleinen-kbc !

Could you provide the configuration used here? The backtrace (set RUST_BACKTRACE=1) would also be useful to see where this is happening.

jerome-kleinen-kbc-be commented 2 years ago

Hi @jszwedko,

I was in a bit of a hurry when I posted the issue as you may have been able to tell :)

Backtraces

limited

2022-03-11T08:56:36.610837Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info"
2022-03-11T08:56:36.610971Z  INFO vector::app: Loading configs. paths=["/etc/vector/internal_logs.toml", "/etc/vector/test.toml"]
thread 'main' panicked at 'mpsc bounded channel requires buffer > 0', /cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/sync/mpsc/bounded.rs:109:5
stack backtrace:
   0: std::panicking::begin_panic
   1: tokio::sync::mpsc::bounded::channel
   2: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
   3: vector::topology::builder::build_pieces::{{closure}}
   4: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
   5: vector::app::Application::prepare_from_opts::{{closure}}
   6: vector::app::Application::prepare_from_opts
   7: vector::app::Application::prepare
   8: vector::main
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

full

2022-03-11T09:14:23.475671Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info"
2022-03-11T09:14:23.475777Z  INFO vector::app: Loading configs. paths=["/etc/vector/internal_logs.toml", "/etc/vector/test.toml"]
thread 'main' panicked at 'mpsc bounded channel requires buffer > 0', /cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/sync/mpsc/bounded.rs:109:5
stack backtrace:
   0:     0x55fd13afb8fc - std::backtrace_rs::backtrace::libunwind::trace::h09f7e4e089375279
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
   1:     0x55fd13afb8fc - std::backtrace_rs::backtrace::trace_unsynchronized::h1ec96f1c7087094e
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
   2:     0x55fd13afb8fc - std::sys_common::backtrace::_print_fmt::h317b71fc9a5cf964
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:67:5
   3:     0x55fd13afb8fc - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::he3555b48e7dfe7f0
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:46:22
   4:     0x55fd13b57d0c - core::fmt::write::h513b07ca38f4fb1b
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/fmt/mod.rs:1149:17
   5:     0x55fd13aebb45 - std::io::Write::write_fmt::haf8c932b52111354
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/io/mod.rs:1697:15
   6:     0x55fd13afe9a0 - std::sys_common::backtrace::_print::h195c38364780a303
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:49:5
   7:     0x55fd13afe9a0 - std::sys_common::backtrace::print::hc09dfdea923b6730
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/sys_common/backtrace.rs:36:9
   8:     0x55fd13afe9a0 - std::panicking::default_hook::{{closure}}::hb2e38ec0d91046a3
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:211:50
   9:     0x55fd13afe555 - std::panicking::default_hook::h60284635b0ad54a8
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:228:9
  10:     0x55fd13aff194 - std::panicking::rust_panic_with_hook::ha677a669fb275654
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:606:17
  11:     0x55fd0f69e235 - std::panicking::begin_panic::{{closure}}::h9ffcc17d82f8df30
  12:     0x55fd0f682504 - std::sys_common::backtrace::__rust_end_short_backtrace::h29396a828774bee0
  13:     0x55fd0f1161ec - std::panicking::begin_panic::h3adeb3e48e763f73
  14:     0x55fd0fb529b3 - tokio::sync::mpsc::bounded::channel::h04017d1f4da373dd
  15:     0x55fd1058febc - <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll::h776a9000cf6e8d0c
  16:     0x55fd10b91352 - vector::topology::builder::build_pieces::{{closure}}::h953c3dbcb7d6cce2
  17:     0x55fd105bb361 - <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll::h84e4aed138b3231e
  18:     0x55fd1090a1b1 - vector::app::Application::prepare_from_opts::{{closure}}::hdb58741660fd0c76
  19:     0x55fd10902c1e - vector::app::Application::prepare_from_opts::hbcf16fdeebe0a6d6
  20:     0x55fd10901c90 - vector::app::Application::prepare::hdf68ef7d713020c9
  21:     0x55fd0f251d07 - vector::main::hf08a7d578da733ae
  22:     0x55fd0f251cb3 - std::sys_common::backtrace::__rust_begin_short_backtrace::h18b6ceb308431a3f
  23:     0x55fd0f251cc9 - std::rt::lang_start::{{closure}}::h6ead1d250160df82
  24:     0x55fd13afb4fb - core::ops::function::impls::<impl core::ops::function::FnOnce<A> for &F>::call_once::h7e688d7cdfeb7e00
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/ops/function.rs:259:13
  25:     0x55fd13afb4fb - std::panicking::try::do_call::h4be824d2350b44c9
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:406:40
  26:     0x55fd13afb4fb - std::panicking::try::h0a6fc7affbe5088d
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:370:19
  27:     0x55fd13afb4fb - std::panic::catch_unwind::h22c320f732ec805e
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panic.rs:133:14
  28:     0x55fd13afb4fb - std::rt::lang_start_internal::{{closure}}::hd38309c108fe679d
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/rt.rs:128:48
  29:     0x55fd13afb4fb - std::panicking::try::do_call::h8fcaf501f097a28e
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:406:40
  30:     0x55fd13afb4fb - std::panicking::try::h20e906825f98acc1
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:370:19
  31:     0x55fd13afb4fb - std::panic::catch_unwind::h8c5234dc632124ef
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panic.rs:133:14
  32:     0x55fd13afb4fb - std::rt::lang_start_internal::hc4dd8cd3ec4518c2
                               at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/rt.rs:128:20
  33:     0x55fd0f251da2 - main
  34:     0x7fedf7a6b493 - __libc_start_main
  35:     0x55fd0f251bba - <unknown>
  36:                0x0 - <unknown>

Config

We use two config files. One file ships the internal logs to splunk, the other ships actual logs from kafka to splunk

log shipping config

# kafka1
[sources.logsource_kafka1]
  # General
  type = "kafka"
  bootstrap_servers = "${kafka1}"
  group_id = "${CONSUMER_GROUP}"
  topics = ["logsource"]
  decoding.codec = "json"

# kafka2
[sources.logsource_kafka2]
  # General
  type = "kafka"
  bootstrap_servers = "${kafka2}"
  group_id = "${CONSUMER_GROUP}"
  topics = ["logsource"]
  decoding.codec = "json"

# add timestamp
[transforms.adding_timestamp]
  type = "remap"
  inputs = ["logsource_kafka*"]
  source = '''
  .timestamp = parse_timestamp(string!(.@timestamp) + "+0000", "%FT%T%.3fZ%z") ?? now()
  '''

# send to Splunk
[sinks.logsource_syslog_splunk]
  type = "splunk_hec"
  inputs = ["adding_timestamp"]
  compression = "none"
  endpoint = "${SPLUNK_HEC_ENDPOINT}"
  host_key = "host"
  index = "testindex"
  source = "logsource"
  sourcetype = "logsource:test"
  default_token = "${SPLUNK_HEC_TOKEN}"

  # Encoding
  encoding.codec = "json"
  encoding.except_fields = ["timestamp"]

  # Healthcheck
  healthcheck.enabled = true

  # Batch
  batch.max_bytes = ${SPLUNK_BATCH_MAX_BYTES}
  batch.timeout_secs = ${SPLUNK_BATCH_TIMEOUT_SECS}

  # Buffer
  buffer.type = "memory"
  buffer.max_events = ${SPLUNK_BUFFER_MAX_EVENTS}

  # Request
  request.concurrency = ${SPLUNK_REQUEST_CONCURRENCY}
  request.timeout_secs = ${SPLUNK_REQUEST_TIMEOUT_SECS}
  request.rate_limit_num = ${SPLUNK_REQUEST_RATE_LIMIT_NUM}
  request.rate_limit_duration_secs = ${SPLUNK_REQUEST_RATE_LIMIT_DURATION_SECS}

  # Tls
  tls.ca_file = "${TLS_CA_FILE}"

Perhaps relevant, we have SPLUNK_BUFFER_MAX_EVENTS set to 0.

internal logs config

# input internal logs
[sources.internal_logs]
  type = "internal_logs"

# add some fields
[transforms.il_remap]
  type = "remap"
  inputs = ["internal_logs"]
  source = '''
  .hostname = get_hostname() ?? "unknown"
  .pillar = "{{environment}}"
  .log_source = get_env_var("LOG_SOURCE") ?? "unknown"
  .os_az = get_env_var("OS_AZ") ?? "unknown"
  .os_cluster = get_env_var("OS_CLUSTER") ?? "unknown"
  .os_namespace = get_env_var("OS_NAMESPACE") ?? "unknown"
  '''

# output to splunk
[sinks.il_splunk]
  # General
  type = "splunk_hec_logs"
  inputs = ["il_remap"]
  compression = "none"
  endpoint = "${SPLUNK_HEC_ENDPOINT}"
  host_key = "hostname"
  index = "vector"
  sourcetype = "vector:log"
  default_token = "${SPLUNK_LOG_TOKEN}"

  # Encoding
  encoding.codec = "json"

  # Healthcheck
  healthcheck.enabled = true

  # Batch
  batch.max_bytes = ${SPLUNK_BATCH_MAX_BYTES}
  batch.timeout_secs = ${SPLUNK_BATCH_TIMEOUT_SECS}

  # Buffer
  buffer.type = "memory"
  buffer.max_events = ${SPLUNK_BUFFER_MAX_EVENTS}

  # Request
  request.concurrency = ${SPLUNK_REQUEST_CONCURRENCY}
  request.timeout_secs = ${SPLUNK_REQUEST_TIMEOUT_SECS}
  request.rate_limit_num = ${SPLUNK_REQUEST_RATE_LIMIT_NUM}
  request.rate_limit_duration_secs = ${SPLUNK_REQUEST_RATE_LIMIT_DURATION_SECS}

  # Tls
  tls.ca_file = "${TLS_CA_FILE}"

I will try a couple of things:

I will comment if I figure out more.

jerome-kleinen-kbc-be commented 2 years ago

My hunch was correct: the panic occurs when the buffer.max_events equals 0. Up until 0.19.2 this was allowed to be 0.

Honestly, I don't remember exactly why we set this to 0 in our environment. I do think we discussed this on Discord a long time ago. Perhaps a 0 buffer actually makes no sense?

jszwedko commented 2 years ago

Hi @jeromekleinen-kbc !

Indeed, good find! Prior to 0.20 we were using a different internal channel implementation that had a defacto buffer capacity of the specified capacity (max_events) plus the number of senders (in this case, 1 per input comporent). The new channels have exactly the specified capacity, but it cannot be 0. We will guard against that to output a better error message.

We'll also discuss whether it makes sense to disable the buffer entirely (which is what I assume you were trying to do by setting max_events to 0?). Note that setting a buffer too low can have negative performance implications because it serializes more of the execution. With a max_events of 1 the input has to wait for the sink to pull and process 1 event at a time where buffering allows the processing to be more decoupled so that the sink can pull all available events.

I'm curious if you have a use-case for disabling the buffer that we might consider how to support. It seems likely that our upcoming end-to-end acknowledgements feature might be of use.

Thanks again for investigating!

jerome-kleinen-kbc-be commented 2 years ago

@jszwedko - first of all, thanks for the quick fix!

The reason we opted for buffer size 0 I believe was related to how we run vector. We run all our vector instances as pods in an openshift instance that enforces strict memory limits. I believe the idea was that with a low buffer size, we would lose less events in case that our pod gets forcefully killed when reaching the memory limit.

In the meantime, we have put a solution in place where we run vector as a child process of https://github.com/grosser/preoomkiller that sends a sigkill to vector when 80% of the allowed memory is reached. However, we have noticed that sometimes this simply isn't sufficient and vector is either allocating memory so fast that it takes up the final 20% almost instantly, or perhaps vector is still allocating memory despite the sigkill signal (f.e. for vrl etc.).

The idea here is that vector would receive a sigkill, would stop reading new logs but would still process all events in the pipelines to the sinks. Recently there was a related issue regarding the fixed 60 second shutdown time. After vector shuts down, it starts again and we try again.

Long story short: one thing we still struggle with is running vector in memory constrained environments. It would be awesome if vector could do this natively. In kubernetes (and similar environments like openshift) the memory limit and memory usage of each process is logged in a file (see preoomkiller) so perhaps vector could read these files and when a configurable percentage is reached, it could simulate backpressure to the sources to limit event inflow and thus keep memory under control.

As a side note, I also haven't found a way to restrict the kafka source (using librdkafka settings) from trying to read too many events all at once. If we haven't read a topic for a while and we resume processing, we can see vector/librdkafka attempting to eliminate the lag as fast as possible leading to a surge in memory usage (and thus many out of memory kills). I looked at the throttle transform, but this would drop any events above a certain limit, which is not what I want to do.

jszwedko commented 2 years ago

Hi @jeromekleinen-kbc !

Thank you for the details!

Having Vector be aware of available memory is a topic that has come up before. https://github.com/vectordotdev/vrl/issues/82 is the latest thought around that but I couldn't find a dedicated issue for it so I created https://github.com/vectordotdev/vector/issues/11942. Feel free to add any more details or thoughts there.

As a side note, I also haven't found a way to restrict the kafka source (using librdkafka settings) from trying to read too many events all at once. If we haven't read a topic for a while and we resume processing, we can see vector/librdkafka attempting to eliminate the lag as fast as possible leading to a surge in memory usage (and thus many out of memory kills). I looked at the throttle transform, but this would drop any events above a certain limit, which is not what I want to do.

Would you want to spin off a separate issue for this? I'm not super familiar with our kafka source implementation but I can definitely imagine a situation where it would consume as quickly as possible until it hits backpressure pushing the events down stream. Do you have any large buffers or batch configuration for the sinks? Would you want to include those details in a new issue?