vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.67k stars 1.56k forks source link

type:kubernetes_logs dont support windows path for to enrich the data it collects with Kubernetes contex #18521

Closed tmb-piXel closed 1 day ago

tmb-piXel commented 1 year ago

A note for the community

Problem

Hi, We use vector in cluster kubernetes on windows worker nods, in windows node path to logs in format like "/var\log\pods\sandbox0-ns_sandbox0-name_sandbox0-uid\sandbox0-container0-name\1.log", vector doesnt parse this path and I get error "Failed to annotate event with pod metadata.". This problem is due to the lack of a delimeter for Windows Path in https://github.com/vectordotdev/vector/blob/master/src/sources/kubernetes_logs/path_helpers.rs#L37 . This can be fix with a few lines code. For example :

    const WINDOWS_DELIMITER: &str = "\\";
    const LINUX_DELIMITER: &str = "/";

    pub(super) fn parse_log_file_path(path: &str) -> Option<LogFileInfo<'_>> {
    let delimiter;

    if path.chars().nth(4) == Some('/') {
        delimiter = LINUX_DELIMITER;
    } else {
        delimiter = WINDOWS_DELIMITER;
    }

    let mut components = path.rsplit(&delimiter);

    let _log_file_name = components.next()?;
    let container_name = components.next()?;
    let pod_dir = components.next()?;

    let mut pod_dir_components = pod_dir.rsplit(LOG_PATH_DELIMITER);

    let pod_uid = pod_dir_components.next()?;
    let pod_name = pod_dir_components.next()?;
    let pod_namespace = pod_dir_components.next()?;

    Some(LogFileInfo {
        pod_namespace,
        pod_name,
        pod_uid,
        container_name,
    })

I hope this will be fixed soon, thanks.

Configuration

data_dir = "C:/vector/data_dir"

[sources.kuber_logs]
type = "kubernetes_logs"
data_dir = "C:/vector/data_dir"
self_node_name = "$HOSTNAME"

[sinks.kafka]
type = "kafka"
inputs = [ "kuber_logs" ]
bootstrap_servers = "********"
topic = "******"
sasl.enabled = true
sasl.mechanism = "SCRAM-SHA-256"
sasl.username = "********"
sasl.password = "********"
batch.timeout_sec = 10
buffer.type = "memory"
buffer.max_events = 100
encoding.codec = "json"

Version

vector 0.30.0 (x86_64-pc-windows-msvc 38c3f0b)

Debug Output

2023-09-09T20:18:57.385530Z ERROR 
source{component_kind="source" component_id=kuber_logs component_type=kubernetes_logs component_name=kuber_logs}:
vector::internal_events::kubernetes_logs: Failed to annotate event with pod metadata. 
event=Log(LogEvent { inner: Inner { fields: Object({"file": Bytes(b"/var\\log\\pods\\app-api-dev_app-api-74c5965b96-5hwhj_87cdc860-401b-4c5b-ba99-96c81db2ca64\\app-api\\0.log"), "message": Bytes(b"log message"), "source_type": Bytes(b"kubernetes_logs")}), size_cache: AtomicCell { value: None }, json_encoded_size_cache: AtomicCell { value: Some(447) } }, metadata: EventMetadata { value: Object({}), secrets: {}, finalizers: EventFinalizers([]), source_id: None, schema_definition: Definition { event_kind: Kind { bytes: Some(()), integer: Some(()), float: Some(()), boolean: Some(()), timestamp: Some(()), regex: Some(()), null: Some(()), undefined: Some(()), array: Some(Collection { known: {}, unknown: Unknown(Infinite(Infinite { bytes: Some(()), integer: Some(()), float: Some(()), boolean: Some(()), timestamp: Some(()), regex: Some(()), null: Some(()), array: Some(()), object: Some(()) })) }), object: Some(Collection { known: {}, unknown: Unknown(Infinite(Infinite { bytes: Some(()), integer: Some(()), float: Some(()), boolean: Some(()), timestamp: Some(()), regex: Some(()), null: Some(()), array: Some(()), object: Some(()) })) }) }, metadata_kind: Kind { bytes: None, integer: None, float: None, boolean: None, timestamp: None, regex: None, null: None, undefined: None, array: None, object: Some(Collection { known: {}, unknown: Unknown(Infinite(Infinite { bytes: Some(()), integer: Some(()), float: Some(()), boolean: Some(()), timestamp: Some(()), regex: Some(()), null: Some(()), array: Some(()), object: Some(()) })) }) }, meaning: {}, log_namespaces: {Vector, Legacy} } } }) error_code="annotation_failed" error_type="reader_failed" stage="processing" internal_log_rate_limit=true

Example Data

This code can testing in https://play.rust-lang.org/

const WINDOWS_DELIMITER: &str = "\\";
const LINUX_DELIMITER: &str = "/";

fn main() {
    let path_linux = String::from("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/sandbox0-container0-name/1.log");
    let path_windows = String::from("/var\\log\\pods\\sandbox0-ns_sandbox0-name_sandbox0-uid\\sandbox0-container0-name\\1.log");

    let file_info_linux = parse_log_file_path(&path_linux);
    println!("{:?}", file_info_linux);
    let file_info_windows = parse_log_file_path(&path_windows);
    println!("{:?}", file_info_windows);
}

fn parse_log_file_path(path: &str) -> Option<LogFileInfo<'_>> {
    let delimetr;

    if path.chars().nth(4) == Some('/') {
        delimetr = LINUX_DELIMITER;
    } else {
        delimetr = WINDOWS_DELIMITER;
    }

    let mut components = path.rsplit(&delimetr);

    let _log_file_name = components.next()?;
    let container_name = components.next()?;
    let pod_dir = components.next()?;

    let mut pod_dir_components = pod_dir.rsplit("_");

    let pod_uid = pod_dir_components.next()?;
    let pod_name = pod_dir_components.next()?;
    let pod_namespace = pod_dir_components.next()?;

    Some(LogFileInfo {
        pod_namespace,
        pod_name,
        pod_uid,
        container_name,
    })
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogFileInfo<'a> {
    pub pod_namespace: &'a str,
    pub pod_name: &'a str,
    pub pod_uid: &'a str,
    pub container_name: &'a str,
}

Additional Context

No response

References

No response

jszwedko commented 1 day ago

Superseded by https://github.com/vectordotdev/vector/pull/21505