Closed djrodgerspryor closed 1 year ago
Thanks for this report, @djrodgerspryor. I agree this should not be happening. Could we see the full config? I am wondering about which of the other remap
settings might be in play here.
Of course. Here's our full vector.toml (minus many lines of tests):
### Global config ###
log_schema.host_key = "vector_host"
acknowledgements.enabled = true
# Where vector keeps track of its internal state
# This is the default, but let's be explicit
data_dir = "/var/lib/vector"
# TODO: I couldn't get this working (see: https://github.com/vectordotdev/vector/issues/15414)
# so we use an ugly systemd wrapper script instead. We should switch back to the secret backend
# when this bug gets fixed.
# # We use vector's 'secret' config feature to get the delivery stream name dynamically
# # from a script, even though it's not actually secret
# [secret]
# [secret.imds]
# type = "exec"
# command = ["/etc/vector/get_dynamic_log_data.rb"]
### Sources ###
[sources.vector_internal_logs]
type = "internal_logs"
[sources.cloud_init_logs]
type = "file"
include = ["/var/log/cloud-init-output.log"]
read_from = "beginning"
# All host and kernel logs
[sources.journald]
type = "journald"
current_boot_only = true
[sources.syslog_socket]
type = "syslog"
max_length = 134217728 # 128MiB
mode = "unix"
# We make an extra socket that can be easily mounted into docker containers (because it's not
# in /dev) which some containers want to use.
# Our base image will also symlink /dev/log to this for anything on the host which wants to
# write there.
path = "/var/run/rsyslog/log.sock"
[sources.syslog_localhost_tcp]
type = "syslog"
address = "127.0.0.1:514"
max_length = 134217728 # 128MiB
mode = "tcp"
[sources.syslog_localhost_udp]
type = "syslog"
address = "127.0.0.1:514"
max_length = 134217728 # 128MiB
mode = "udp"
[sources.syslog_docker_bridge_tcp]
type = "syslog"
address = "172.17.0.1:514"
max_length = 134217728 # 128MiB
mode = "tcp"
[sources.syslog_docker_bridge_udp]
type = "syslog"
address = "172.17.0.1:514"
max_length = 134217728 # 128MiB
mode = "udp"
# Meilisearch doesn't write logs to a file annoyingly, so we pull them out of stdout with docker
[sources.docker_meilisearch_logs]
type = "docker_logs"
# Include the container by either name or label
include_containers = ["compose_meilisearch_"]
include_labels = ["SERVICE_TAGS=progname__meilisearch"]
# Allow data from another vector instance from inside a docker container to send us logs
[sources.vector_docker_bridge]
type = "vector"
# 44255 is a random port that I've decided to use for vector
address = "172.17.0.1:44255"
# Allow data from another vector instance from anywhere on this box
[sources.vector_host]
type = "vector"
# 44255 is a random port that I've decided to use for vector
address = "127.0.0.1:44255"
### Transforms ###
[transforms.tagged_vector]
type = "remap"
inputs = ["vector_docker_bridge", "vector_host"]
source = """
# Expect logapp name and service to come with each message
"""
[transforms.tagged_docker_meilisearch_logs]
type = "remap"
inputs = ["docker_meilisearch_logs"]
source = """
# Best effort to parse the log message
. |= parse_regex(.message, r'\\[(?P<timestamp>[^ ]*)\\s+(?P<level_name>[^ ]*)\\s+(?P<source_code_module>[^ ]*)\\]\\s+(?P<remote_host>[^ ]*)\\s+"(?P<method>[^ ]*)\\s+(?P<path>[^ ]*)\\s+(?P<protocol>[^ ]*)"\\s+(?P<status>[^ ]*)\\s+(?P<response_size_bytes>[^ ]*)\\s+(?P<host>[^ ]*)\\s+"(?P<user_agent_raw>.*?)"\\s+(?P<response_time>\\d.\\d+)') ?? .
# Remap nested fields
.source_code.module = del(.source_code_module)
.user_agent.raw = del(.user_agent_raw)
# Cast types
.timestamp = to_timestamp(.timestamp) ?? now()
.status = to_int(.status) ?? .status
.response_size_bytes = to_int(.response_size_bytes) ?? .response_size_bytes
.response_time = to_float(.response_time) ?? .response_time
# Add metadata
.logapp_name = "meilisearch"
.service = "meilisearch"
"""
[transforms.tagged_vector_internal_logs]
type = "remap"
inputs = ["vector_internal_logs"]
source = """
# Downcase all keys
. = map_keys(.) -> |key| {
if is_string(key) {
downcase(key)
} else {
key
}
}
.logapp_name = "host"
.service = "vector"
.level_name = .metadata.level
# Convert internal log levels into a syslog like numerical level field
if .level_name == "TRACE" {
.level = 7
} else if .level_name == "DEBUG" {
.level = 6
} else if .level_name == "INFO" {
.level = 6
} else if .level_name == "WARN" {
.level = 4
} else if .level_name == "ERROR" {
.level = 3
}
"""
[transforms.tagged_cloud_init_logs]
type = "remap"
inputs = ["cloud_init_logs"]
source = """
# Downcase all keys
. = map_keys(.) -> |key| {
if is_string(key) {
downcase(key)
} else {
key
}
}
.logapp_name = "host"
.service = "cloud-init"
"""
[transforms.tagged_journald]
type = "remap"
inputs = ["journald"]
source = """
# Downcase all keys and remove a leading underscore from them (which kernal logs like to include for some reason)
. = map_keys(.) -> |key| {
if is_string(key) {
replace(downcase(key), r'^_', "", 1)
} else {
key
}
}
.logapp_name = "host"
.service = .syslog_identifier || .systemd_unit || .comm
# Convert syslog fields to a log 'level' key which is what we commonly use for filtering
if .syslog_facility == null {
.syslog_facility = del(.syslogfacility)
}
.level = .level || .syslog_severity || .syslogseverity || .priority
"""
[transforms.tagged_syslog]
type = "remap"
inputs = ["syslog_*"]
source = """
if .appname == "rabbitmq" {
# Remove the connection id at the start of the message so that we can parse it as JSON
.message = replace(.message, r'^\\s*<[^>]*>\\s*-\\s*\\{', "{") ?? .message
}
# If the message is a valid JSON object, then merge it into the main log object.
# Stileapp and Rabbit both pass JSON into syslog like this
parsed_message, err = object(parse_json(.message) ?? null)
if err == null {
# Merge parsed message into root
. = merge(., parsed_message)
}
# Fix legacy camelCasing
if .request_log == null { .request_log = del(.requestLog) }
if .amqp_routing_key == null { .amqp_routing_key = del(.amqpRoutingKey) }
if .user_agent == null { .user_agent = del(.userAgent) }
if .content_length == null { .content_length = del(.contentLength) }
if .client_ip_address == null { .client_ip_address = del(.clientIpAddress) }
if .client_local_forwarded_for_ip_addresses == null { .client_local_forwarded_for_ip_addresses = del(.clientLocalForwardedForIpAddresses) }
if .client_tab_id == null { .client_tab_id = del(.clientTabId) }
if .is_internal_admin_request == null { .is_internal_admin_request = del(.isInternalAdminRequest) }
if .content_length_header == null { .content_length_header = del(.contentLengthHeader) }
if .global_correlation_id == null { .global_correlation_id = del(.globalCorrelationId) }
if .is_internal_admin_request == null { .is_internal_admin_request = del(.isInternalAdminRequest) }
if .open_trace_id == null { .open_trace_id = del(.openTraceId) }
if .view_trace_link == null { .view_trace_link = del(.viewTraceLink) }
# Simply downcase all remaining keys
. = map_keys(.) -> |key| {
if is_string(key) {
downcase(key)
} else {
key
}
}
# Convert syslog fields to a log 'level' key which is what we commonly use for filtering
if .syslog_facility == null {
.syslog_facility = del(.syslogfacility)
}
.level = .level || .syslog_severity || .syslogseverity || .severity || .priority
# Set a timestamp field
.timestamp = to_timestamp(.timestamp || .time || .ts_utc || .@timestamp) ?? now()
# Pick the first non-empty, non-null value
.logapp_name = compact([.logapp_name, .appname, .service, .program, .facility, .progname, "unknown-syslog"])[0]
# Consul prepends @ to various fields for some reason
if .logapp_name == "consul" {
if is_string(.@message) { .message = del(.@message) }
if is_string(.@level) { .level = del(.@level) }
if is_timestamp(to_timestamp(.@timestamp) ?? null) { .timestamp = del(.@timestamp) }
if is_string(.@module) { .consul_module = del(.@module) }
}
# Rabbit has it's own weird field names
if .logapp_name == "rabbitmq" {
if is_string(.msg) { .message = del(.msg) }
if is_timestamp(to_timestamp(.time) ?? null) { .timestamp = del(.time) }
}
"""
[transforms.all_logs]
type = "remap"
inputs = ["tagged_*"]
source = """
# Ensure that .level is an integer. Otherwise, move it to .level_name
if .level != null {
level_int, err = to_int(.level)
if err == null {
.level = level_int
} else {
.level_name = del(.level)
}
}
# Normalise level name
if is_string(.level_name) {
.level_name = upcase(.level_name) ?? .level_name
# Canonicalize some level names
if .level_name == "WARNING" {
.level_name = "WARN"
} else if .level_name == "CRITICAL" || .level_name == "CRIT" {
.level_name = "FATAL"
} else if .level_name == "PANIC" {
.level_name = "EMERG"
}
}
# Convert level name to level if level isn't set
if .level == null && .level_name != null {
# Try converting the level name back into an integer log level
if .level_name == "TRACE" || .level_name == "DEBUG" {
.level = 7
} else if .level_name == "INFO" {
.level = 6
} else if .level_name == "NOTICE" {
.level = 5
} else if .level_name == "WARN" {
.level = 4
} else if .level_name == "ERROR" {
.level = 3
} else if .level_name == "FATAL" {
.level = 2
} else if .level_name == "ALERT" {
.level = 1
} else if .level_name == "EMERG" {
.level = 0
}
}
# Add cannonical level names if they're not already set
if .level != null && .level_name == null {
if .level == 7 {
.level_name = "DEBUG"
} else if .level == 6 {
.level_name = "INFO"
} else if .level == 5 {
.level_name = "NOTICE"
} else if .level == 4 {
.level_name = "WARN"
} else if .level == 3 {
.level_name = "ERROR"
} else if .level == 2 {
.level_name = "FATAL"
} else if .level == 1 {
.level_name = "ALERT"
} else if .level == 0 {
.level_name = "EMERG"
}
}
# Ensure as much AWS metadata as we can
if .aws == null {
.aws == {}
}
if .aws.availability_zone == null {
.aws.availability_zone = "${AVAILABILITY_ZONE:?AWS availability zone must be provided for this environment!}"
}
if .aws.region == null {
.aws.region = "${REGION:?AWS region must be provided for this environment!}"
}
if .aws.instance_id == null {
.aws.instance_id = "${INSTANCE_ID:?AWS instance id must be provided for this environment!}"
}
if .aws.instance_type == null {
.aws.instance_type = "${INSTANCE_TYPE:?AWS instance type must be provided for this environment!}"
}
if .aws.account_id == null {
.aws.account_id = "${ACCOUNT_ID:?AWS account id must be provided for this environment!}"
}
# Ensure as much host metadata as we can
if !is_object(.host) {
.host == {}
}
if .host.ip_address == null {
.host.ip_address = "${IP_ADDRESS:?IP address must be provided for this environment!}"
}
if .host.public_ip_address == null {
public_ip = "${PUBLIC_IP_ADDRESS}"
if !is_empty(public_ip) {
.host.public_ip_address = public_ip
}
}
if .host.hostname == null {
hostname, err = get_hostname()
if err == null {
.host.hostname = hostname
}
}
# Ensure stack id
if .stack_id == null {
# Get it from the envrironment (provided by the init.sh script via get_dynamic_log_data.rb)
stack_id = "${STACK_ID}"
if !is_empty(stack_id) {
.stack_id = stack_id
}
}
# Sanity check the timestamp field
.timestamp = timestamp(.timestamp) ?? now()
if (.timestamp > now() ?? true) || (.timestamp < to_timestamp(to_unix_timestamp(now()) - 2592000, "seconds") ?? true) {
if .testonly_dont_change_timestamp != true {
# Sometimes tests stub time and produce crazy timestamp values in the future or the
# distant past, so we correct those to the current time. 2592000 is the number of seconds in
# a month.
.timestamp = now()
}
}
# Ensure that we have a logapp_name
.logapp_name = string(.logapp_name) ?? "unknown"
# Normalise the logapp name to be only lower case letters, numbers and dashes
.logapp_name = replace(downcase(.logapp_name), r'(?i)[^a-z0-9\\-\\.]+', "-")
# Normalise the name of error log fields
if .error == null && is_object(.exception) && is_string(.exception.message) {
.error = del(.exception)
}
if .error == null && is_object(.e) && is_string(.e.message) {
.error = del(.e)
}
# Make a nice link to all requests with the same GCID
if .global_correlation_id != null {
# Convert the log TS to unix to make it easy to manipulate
unix_timestamp = to_unix_timestamp(.timestamp)
.link_request_logs = join([
"https://stile.chaossearch.io/#/analytics?pathname=%252Fkibana%252Fapp%252Fdiscover&kibana=%2523%252F%253F_g%253D(filters%253A!()%252CrefreshInterval%253A(pause%253A!t%252Cvalue%253A0)%252Ctime%253A(from%253A'",
# From one hour ago.
# Use a custom format string so that we can replace colons with the double-uri escaped colons (%253A) which kibana expects
format_timestamp(to_timestamp(unix_timestamp - 3600) ?? .timestamp, format: "%Y-%m-%dT%H%%253A%M%%253A%S%.3fZ") ?? .timestamp,
"'%252Cto%253A'",
# To one hour in the future
# Use a custom format string so that we can replace colons with the double-uri escaped colons (%253A) which kibana expects
# Chose the correct index for the environment based on ACCOUNT_SHORTNAME.
# When we get secrets working, use this rather than the ACCOUNT_SHORTNAME env var: SECRETTODO[imds.account_shortname]
format_timestamp(to_timestamp(unix_timestamp + 3600) ?? .timestamp, format: "%Y-%m-%dT%H%%253A%M%%253A%S%.3fZ") ?? .timestamp,
"'))%2526_a%253D(columns%253A!(_source)%252Cfilters%253A!()%252Cindex%253A${ACCOUNT_SHORTNAME:?AWS account shortname must be provided for this environment! It should look like dev|corp|prod etc.}-all-logs-view%252Cinterval%253Aauto%252Cquery%253A(language%253Alucene%252Cquery%253A'global_correlation_id%253A%252522",
.global_correlation_id,
"%252522')%252Csort%253A!())"
], "") ?? null
}
# Useful admin tools links
if (.principal || .principal_name) != null {
.link_admin_tool_user = join(["https://stileapp.com/auinternal/internalAdmin/users/", (.principal || .principal_name)]) ?? null
}
if .institution_id != null {
.link_admin_tool_institution = join(["https://stileapp.com/auinternal/internalAdmin/institutions/", to_string(.institution_id) ?? ""]) ?? null
}
if .subject_id != null {
.link_admin_tool_subject = join(["https://stileapp.com/auinternal/internalAdmin/subjects/", to_string(.subject_id) ?? ""]) ?? null
}
if (.activity_id || .lesson_item_id) != null {
.link_admin_tool_activity = join(["https://stileapp.com/auinternal/internalAdmin/activities/", to_string(.activity_id || .lesson_item_id) ?? ""]) ?? null
}
"""
### Sinks ###
[sinks.firehose_logs]
type = "aws_kinesis_firehose"
inputs = ["all_logs"]
# TODO: use secrets instead of a wrapper script providing an env variable as soon as the bug is fixed (see: https://github.com/vectordotdev/vector/issues/15414)
# stream_name = 'SECRETTODO[imds.delivery_stream_name]'
stream_name = "${DELIVERY_STREAM_NAME:?Firehose stream name must be provided for this environment! It should look like 'delivery-stream-logs-<dev|corp|prod etc.>-apse2'}"
encoding.codec = "json"
### Metrics ###
[sources.vector_internal_metrics]
type = "internal_metrics"
# We already have the prometheus node exporter, but I'm interested to see if we can replace it with vector, so
# let's collect the host metrics too
[sources.host_metrics]
type = "host_metrics"
scrape_interval_secs = 15
# Define a prometheus scrape endpoint
[sinks.prometheus_export]
type = "prometheus_exporter"
inputs = [
"vector_internal_metrics",
"host_metrics",
# Include any metrics forwarded from another vector instance
"vector_docker_bridge",
"vector_host"
]
address = "0.0.0.0:9111" # Chose 9111 as the canonical port for vector metrics on each instance
buffer.when_full = "drop_newest" # Don't let metrics pile-up
It could be related to accepting logs from another vector instance. The place where I'm seeing this crash frequently is the only place where we actually use the sources.vector_docker_bridge
source.
We are investigating this as a bug in the VRL compiler. In the meantime, however, we noted that there is actually a second replace(downcase(…))
combination in the tagged_journald
transform:
# Downcase all keys and remove a leading underscore from them (which kernal logs like to include for some reason)
. = map_keys(.) -> |key| {
if is_string(key) {
replace(downcase(key), r'^_', "", 1)
} else {
key
}
}
This is more likely to be the actual culprit, as our closure support has had some issues in the past. Could you try setting drop_on_error
on this transform to see if it makes any difference for this behavior?
Note that the key
here should always be a string, so the is_string
guard shouldn't be needed.
I tried adding drop_on_error
to all the transforms, but it didn't help. The is_string
check in map_keys
was a bit of paranoia I added when trying to debug this issue; I agree that it shouldn't be necessary.
I assumed that the replace(downcase(...)
call from remap_all
is at fault because that's where the error log comes from:
ERROR transform{component_kind="transform" component_id=all_logs component_type=remap component_name=all_logs}
Yes, you're right, it does appear to be coming from that all_logs
transform. We are baffled, though, what might be causing this bug and need to get more info out of Vector. Are you able to build Vector from source? If not, we'll add a hook into the next nightly to get at the details to try to make a reproducer.
Yep. I've shipped an optimized debug build with this patch: https://github.com/vectordotdev/vector/compare/master...StileEducation:vector:master
And confirmed that vector thinks the VRL program is non-fallible and non-abortable:
Apr 28 23:41:28 rds-log-shipper-i-07e8dc1ecc9c6cf77.au.s522.net vector[26665]: 2023-04-28T23:41:28.511144Z WARN transform{component_kind="transform" component_id=all_logs component_type=remap component_name=all_logs}: vector::transforms::remap: Unexpected VRL error encountered: event has been dropped. program_is_falible=false, program_is_abortable=false, drop=false, reroute_dropped=false
Is there any logging you'd like me to add in the VRL executor itself to work out how it's erroring?
Oh, that is awesome, thanks a lot!
Our suggestion would be to add an extra clone of the original event right at the top of fn transform
so as to be able to log that original event where the error hits. That should give us the last piece of the data to reproduce it.
Aha. It looks like it's caused by metric data getting into that VRL transform. I think these metrics come from the vector_docker_bridge
source via the tagged_vector
transform.
Apr 29 02:05:24 rds-log-shipper-i-033de8255f1431aea.au.s522.net vector[22716]: 2023-04-29T02:05:24.597469Z WARN transform{component_kind="transform" component_id=all_logs component_type=remap component_name=all_logs}: vector::transforms::remap: Unexpected VRL error encountered: event has been dropped. program_is_falible=false, program_is_abortable=false, drop=false, reroute_dropped=false, event=Metric(Metric { series: MetricSeries { name: MetricName { name: "rds_log_count", namespace: None }, tags: Some(MetricTags({"engine": Single(Value("mysql")), "id": Single(Value("live-db-v13")), "role": Single(Value("primary")), "type": Single(Value("audit"))})) }, data: MetricData { time: MetricTime { timestamp: Some(2023-04-29T02:03:00Z), interval_ms: None }, kind: Incremental, value: Counter { value: 1.0 } }, metadata: EventMetadata { value: Object({}), secrets: {}, finalizers: EventFinalizers([EventFinalizer { status: AtomicCell { value: Delivered }, batch: BatchNotifier(OwnedBatchNotifier { status: AtomicCell { value: Delivered }, notifier: Some(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) }) }) }]), schema_definition: Definition { event_kind: Kind { bytes: None, integer: None, float: None, boolean: None, timestamp: None, regex: None, null: None, undefined: None, array: None, object: Some(Collection { known: {}, unknown: Unknown(Infinite(Infinite { bytes: Some(()), integer: Some(()), float: Some(()), boolean: Some(()), timestamp: Some(()), regex: Some(()), null: Some(()), array: Some(()), object: Some(()) })) }) }, metadata_kind: Kind { bytes: None, integer: None, float: None, boolean: None, timestamp: None, regex: None, null: None, undefined: None, array: None, object: Some(Collection { known: {}, unknown: Unknown(Infinite(Infinite { bytes: Some(()), integer: Some(()), float: Some(()), boolean: Some(()), timestamp: Some(()), regex: Some(()), null: Some(()), array: Some(()), object: Some(()) })) }) }, meaning: {}, log_namespaces: {Legacy} } } })
I have reproduced the panic with the following config:
[sources.input]
type = "stdin"
decoding.codec = "native_json"
[transforms.remap]
type = "remap"
inputs = ["input"]
source = """
.logapp_name = string(.logapp_name) ?? "unknown"
.logapp_name = downcase(.logapp_name)
"""
[sinks.output]
type = "console"
inputs = ["remap"]
encoding.codec = "native_json"
Interestingly, replacing the source with the following does not cause the error:
.logapp_name = downcase(string(.logapp_name) ?? "unknown")
I used the following data as input on stdin:
{"metric":{"name":"count","namespace":null,"tags":null,"timestamp":"2023-04-29T02:03:00Z","kind":"incremental","counter":{"value":1.0}}}
@djrodgerspryor While we work out a fix for this, the above suggests a workaround you can use right now to help you move forward. If you merge the first assignment to .logapp_name
into the second, or use a local variable instead of of a field (i.e. no leading .
), it should pass. Of course, routing metrics away from this remap is also a potential workaround.
Thanks!
Any tips on how to route the metrics sepately from the logs? I can't see any way to do it from the docs, but I'm probably just missing something.
Hey @djrodgerspryor !
You can use a route
transform to route differently based on event type like:
[route.my_route]
type = "route"
route.logs = { type = "is_log" }
route.metrics = { type = "is_metric" }
Then you can consume those routes separately to handle the logs and metrics differently.
We feel that https://github.com/vectordotdev/vrl/issues/186 is the underlying issue here so we'll close this one as a duplicate but we appreciate you raising it up! Let us know if you have any trouble routing the logs and metrics to process them separately.
Thanks, that's now working perfectly for us!
A note for the community
Problem
We're seeing vector frequently crash due to a panic in
remap.rs
:Configuration
The relevant section of the
all_logs
transform (the only section withdowncase
andreplace
) is:Which looks like it shouldn't error (and
vector validate
agrees). I can't add additional error handling around downcase becausevector validate
thinks it's infallible.I experimented with adding
drop_on_error
anddrop_on_abort
but it doesn't seem to change anything.Version
vector 0.29.1 (x86_64-unknown-linux-gnu 74ae15e 2023-04-20 14:50:42.739094536)
Example Data
I don't know what input data is causing the error: it would be nice to print out the offending event if a panic is triggered.
Additional Context
Should vector be able handle panics from bugs like this from transform jobs and fail gracefully? We've got a bunch of different logs and metrics flowing through this vector instance, and they're all being disrupted by these crashes. It would be nice if the impact of these kinds of bugs could be limited to the events which trigger them.
References
No response