vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.41k stars 1.51k forks source link

Orphaned buffer lock #18831

Open nabokihms opened 10 months ago

nabokihms commented 10 months ago

A note for the community

Problem

When I try to reload my vector configuration, it says in the log:

2023-10-12T15:56:16.714921Z ERROR vector::topology: Configuration error. error=Sink "destination/cluster/loki-ebapp": error occurred when building buffer: failed to build individual stage 0: failed to load/create ledger: failed to lock buffer.lock; is another Vector process running and using this buffer?
2023-10-12T15:56:16.714954Z  WARN vector::topology::running: Failed to completely load new configuration. Restoring old configuration.
2023-10-12T15:56:16.714997Z  INFO vector::topology::running: Running healthchecks.
2023-10-12T15:56:16.716437Z  INFO vector::topology::running: Old configuration restored successfully.

It happens before there is a lock file in the directory created three days ago.

root@log-shipper-agent-8r87q:/# ls -lah /vector-data/buffer/v2/destination/cluster/loki-ebapp/
total 197M
drwxr-xr-x 2 root root 4.0K Oct  9 21:01 .
drwxr-xr-x 3 root root 4.0K Oct  9 05:54 ..
-rw-r--r-- 1 root root  70M Oct  9 21:00 buffer-data-58.dat
-rw-r--r-- 1 root root 128M Oct  9 21:02 buffer-data-59.dat
-rw-r--r-- 1 root root   24 Oct 10 19:51 buffer.db
-rw-r--r-- 1 root root    0 Oct  9 21:01 buffer.lock

No other vector processes are running.

Configuration

root@log-shipper-agent-8r87q:/# cat /etc/vector/**/*
{
  "data_dir": "/vector-data",
  "expire_metrics_secs": 60,
  "api" : {
    "address" : "127.0.0.1:8686",
    "enabled" : true,
    "playground" : false
  },
  "log_schema": {
    "host_key": "host",
    "message_key": "message",
    "source_type_key": "source_type",
    "timestamp_key": "timestamp"
  },
  "sources": {
    "internal_metrics": {
      "type": "internal_metrics"
    }
  },
  "sinks": {
    "prometheus_sink": {
      "type": "prometheus_exporter",
      "inputs": [
        "internal_metrics"
      ],
      "address": "127.0.0.1:9090",
      "suppress_timestamp": true
    }
  }
}
{
  "sources": {
    "cluster_logging_config/ebapp:ebapp": {
      "type": "kubernetes_logs",
      "extra_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "extra_field_selector": "metadata.namespace=ebapp,metadata.name!=$VECTOR_SELF_POD_NAME",
      "extra_namespace_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "annotation_fields": {
        "container_image": "image",
        "container_name": "container",
        "pod_ip": "pod_ip",
        "pod_labels": "pod_labels",
        "pod_name": "pod",
        "pod_namespace": "namespace",
        "pod_node_name": "node",
        "pod_owner": "pod_owner"
      },
      "glob_minimum_cooldown_ms": 1000,
      "use_apiserver_cache": true
    },
    "cluster_logging_config/ebapp_app-one:ebapp": {
      "type": "kubernetes_logs",
      "extra_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "extra_field_selector": "metadata.namespace=ebapp,metadata.name!=$VECTOR_SELF_POD_NAME",
      "extra_namespace_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "annotation_fields": {
        "container_image": "image",
        "container_name": "container",
        "pod_ip": "pod_ip",
        "pod_labels": "pod_labels",
        "pod_name": "pod",
        "pod_namespace": "namespace",
        "pod_node_name": "node",
        "pod_owner": "pod_owner"
      },
      "glob_minimum_cooldown_ms": 1000,
      "use_apiserver_cache": true
    },
    "cluster_logging_config/ebapp_app-two:ebapp": {
      "type": "kubernetes_logs",
      "extra_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "extra_field_selector": "metadata.namespace=ebapp,metadata.name!=$VECTOR_SELF_POD_NAME",
      "extra_namespace_label_selector": "log-shipper.deckhouse.io/exclude notin (true)",
      "annotation_fields": {
        "container_image": "image",
        "container_name": "container",
        "pod_ip": "pod_ip",
        "pod_labels": "pod_labels",
        "pod_name": "pod",
        "pod_namespace": "namespace",
        "pod_node_name": "node",
        "pod_owner": "pod_owner"
      },
      "glob_minimum_cooldown_ms": 1000,
      "use_apiserver_cache": true
    }
  },
  "transforms": {
    "transform/destination/loki-ebapp/00_ratelimit": {
      "exclude": "null",
      "inputs": [
        "transform/source/ebapp/02_local_timezone"
      ],
      "threshold": 1000,
      "type": "throttle",
      "window_secs": 60
    },
    "transform/source/ebapp/00_owner_ref": {
      "drop_on_abort": false,
      "inputs": [
        "cluster_logging_config/ebapp:ebapp"
      ],
      "source": "if exists(.pod_owner) {\n    .pod_owner = string!(.pod_owner)\n\n    if starts_with(.pod_owner, \"ReplicaSet/\") {\n        hash = \"-\"\n        if exists(.pod_labels.\"pod-template-hash\") {\n            hash = hash + string!(.pod_labels.\"pod-template-hash\")\n        }\n\n        if hash != \"-\" \u0026\u0026 ends_with(.pod_owner, hash) {\n            .pod_owner = replace(.pod_owner, \"ReplicaSet/\", \"Deployment/\")\n            .pod_owner = replace(.pod_owner, hash, \"\")\n        }\n    }\n\n    if starts_with(.pod_owner, \"Job/\") {\n        if match(.pod_owner, r'-[0-9]{8,11}$') {\n            .pod_owner = replace(.pod_owner, \"Job/\", \"CronJob/\")\n            .pod_owner = replace(.pod_owner, r'-[0-9]{8,11}$', \"\")\n        }\n    }\n}",
      "type": "remap"
    },
    "transform/source/ebapp/01_clean_up": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp/00_owner_ref"
      ],
      "source": "if exists(.pod_labels.\"controller-revision-hash\") {\n    del(.pod_labels.\"controller-revision-hash\")\n}\nif exists(.pod_labels.\"pod-template-hash\") {\n    del(.pod_labels.\"pod-template-hash\")\n}\nif exists(.kubernetes) {\n    del(.kubernetes)\n}\nif exists(.file) {\n    del(.file)\n}",
      "type": "remap"
    },
    "transform/source/ebapp/02_local_timezone": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp/01_clean_up"
      ],
      "source": "if exists(.\"timestamp\") {\n    ts = parse_timestamp!(.\"timestamp\", format: \"%+\")\n    .\"timestamp\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}\n\nif exists(.\"timestamp_end\") {\n    ts = parse_timestamp!(.\"timestamp_end\", format: \"%+\")\n    .\"timestamp_end\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/00_owner_ref": {
      "drop_on_abort": false,
      "inputs": [
        "cluster_logging_config/ebapp_app-one:ebapp"
      ],
      "source": "if exists(.pod_owner) {\n    .pod_owner = string!(.pod_owner)\n\n    if starts_with(.pod_owner, \"ReplicaSet/\") {\n        hash = \"-\"\n        if exists(.pod_labels.\"pod-template-hash\") {\n            hash = hash + string!(.pod_labels.\"pod-template-hash\")\n        }\n\n        if hash != \"-\" \u0026\u0026 ends_with(.pod_owner, hash) {\n            .pod_owner = replace(.pod_owner, \"ReplicaSet/\", \"Deployment/\")\n            .pod_owner = replace(.pod_owner, hash, \"\")\n        }\n    }\n\n    if starts_with(.pod_owner, \"Job/\") {\n        if match(.pod_owner, r'-[0-9]{8,11}$') {\n            .pod_owner = replace(.pod_owner, \"Job/\", \"CronJob/\")\n            .pod_owner = replace(.pod_owner, r'-[0-9]{8,11}$', \"\")\n        }\n    }\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/01_clean_up": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-one/00_owner_ref"
      ],
      "source": "if exists(.pod_labels.\"controller-revision-hash\") {\n    del(.pod_labels.\"controller-revision-hash\")\n}\nif exists(.pod_labels.\"pod-template-hash\") {\n    del(.pod_labels.\"pod-template-hash\")\n}\nif exists(.kubernetes) {\n    del(.kubernetes)\n}\nif exists(.file) {\n    del(.file)\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/02_local_timezone": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-one/01_clean_up"
      ],
      "source": "if exists(.\"timestamp\") {\n    ts = parse_timestamp!(.\"timestamp\", format: \"%+\")\n    .\"timestamp\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}\n\nif exists(.\"timestamp_end\") {\n    ts = parse_timestamp!(.\"timestamp_end\", format: \"%+\")\n    .\"timestamp_end\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-one/03_label_filter": {
      "condition": "if is_boolean(.pod_labels.app) || is_float(.pod_labels.app) {\n    data, err = to_string(.pod_labels.app);\n    if err != null {\n        false;\n    } else {\n        includes([\"ebapp-first\"], data);\n    };\n} else if .pod_labels.app == null {\n    \"null\";\n} else {\n    includes([\"ebapp-first\"], .pod_labels.app);\n}",
      "inputs": [
        "transform/source/ebapp_app-one/02_local_timezone"
      ],
      "type": "filter"
    },
    "transform/source/ebapp_app-two/00_owner_ref": {
      "drop_on_abort": false,
      "inputs": [
        "cluster_logging_config/ebapp_app-two:ebapp"
      ],
      "source": "if exists(.pod_owner) {\n    .pod_owner = string!(.pod_owner)\n\n    if starts_with(.pod_owner, \"ReplicaSet/\") {\n        hash = \"-\"\n        if exists(.pod_labels.\"pod-template-hash\") {\n            hash = hash + string!(.pod_labels.\"pod-template-hash\")\n        }\n\n        if hash != \"-\" \u0026\u0026 ends_with(.pod_owner, hash) {\n            .pod_owner = replace(.pod_owner, \"ReplicaSet/\", \"Deployment/\")\n            .pod_owner = replace(.pod_owner, hash, \"\")\n        }\n    }\n\n    if starts_with(.pod_owner, \"Job/\") {\n        if match(.pod_owner, r'-[0-9]{8,11}$') {\n            .pod_owner = replace(.pod_owner, \"Job/\", \"CronJob/\")\n            .pod_owner = replace(.pod_owner, r'-[0-9]{8,11}$', \"\")\n        }\n    }\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-two/01_clean_up": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-two/00_owner_ref"
      ],
      "source": "if exists(.pod_labels.\"controller-revision-hash\") {\n    del(.pod_labels.\"controller-revision-hash\")\n}\nif exists(.pod_labels.\"pod-template-hash\") {\n    del(.pod_labels.\"pod-template-hash\")\n}\nif exists(.kubernetes) {\n    del(.kubernetes)\n}\nif exists(.file) {\n    del(.file)\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-two/02_local_timezone": {
      "drop_on_abort": false,
      "inputs": [
        "transform/source/ebapp_app-two/01_clean_up"
      ],
      "source": "if exists(.\"timestamp\") {\n    ts = parse_timestamp!(.\"timestamp\", format: \"%+\")\n    .\"timestamp\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}\n\nif exists(.\"timestamp_end\") {\n    ts = parse_timestamp!(.\"timestamp_end\", format: \"%+\")\n    .\"timestamp_end\" = format_timestamp!(ts, format: \"%+\", timezone: \"local\")\n}",
      "type": "remap"
    },
    "transform/source/ebapp_app-two/03_label_filter": {
      "condition": "if is_boolean(.pod_labels.app) || is_float(.pod_labels.app) {\n    data, err = to_string(.pod_labels.app);\n    if err != null {\n        false;\n    } else {\n        includes([\"ebapp-second\"], data);\n    };\n} else if .pod_labels.app == null {\n    \"null\";\n} else {\n    includes([\"ebapp-second\"], .pod_labels.app);\n}",
      "inputs": [
        "transform/source/ebapp_app-two/02_local_timezone"
      ],
      "type": "filter"
    }
  },
  "sinks": {
    "destination/cluster/loki-ebapp": {
      "type": "loki",
      "inputs": [
        "transform/destination/loki-ebapp/00_ratelimit"
      ],
      "healthcheck": {
        "enabled": false
      },
      "buffer": {
        "max_size": 268435488,
        "type": "disk",
        "when_full": "drop_newest"
      },
      "encoding": {
        "only_fields": [
          "message"
        ],
        "codec": "text",
        "timestamp_format": "rfc3339"
      },
      "endpoint": "http://loki.d8-monitoring:3100",
      "tls": {
        "verify_hostname": true,
        "verify_certificate": true
      },
      "labels": {
        "container": "{{ container }}",
        "host": "{{ host }}",
        "image": "{{ image }}",
        "namespace": "{{ namespace }}",
        "node": "{{ node }}",
        "pod": "{{ pod }}",
        "pod_ip": "{{ pod_ip }}",
        "pod_labels_*": "{{ pod_labels }}",
        "pod_owner": "{{ pod_owner }}",
        "stream": "{{ stream }}"
      },
      "remove_label_fields": true,
      "out_of_order_action": "rewrite_timestamp"
    }
  }
}

Version

v0.32.0

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

nabokihms commented 10 months ago

Probably, this is due to the vector not graceful process termination. Maybe it is better to switch to the time-based locking for vector: set the lock for, for example, 5 minutes and try to renew it every second minute. If the lock is not renewed, the vector process is allowed to take it.

tobz commented 10 months ago

Yep, this is definitely a limitation and something that isn't handled terribly well with non-graceful shutdown.

We would likely want to make it work somewhat more like a PID file, where any Vector process that wants to take the lock will confirm that, if the lock file already exists, that the PID referenced within it is still running... otherwise, it would delete the lock file and take it over.

(Just to comment on it: I would want to avoid a time-based solution because I don't want another process trying to take over the buffer if there was a simple bug with the background task responsible for updating lock liveness. If there's a stalled Vector process, something might still be happening to the disk buffer, so we can only be sure that nothing is touching it if the PID itself is gone: process not running, can't possibly be modifying the disk buffer.

nabokihms commented 10 months ago

Fantastic idea, thanks for clarifying, @tobz.

How will it work in a container? Will vector need access to the host pid namespace?

nabokihms commented 10 months ago

I assume the lock is not required for the vector running in the Kubernetes cluster because the DaemonSet controller can handle that two pods are not running on the same node. As a workaround, in our vector installation, we will remove the lock in the entrypoint every time the container starts.

And, of course, it would be nice to finalize the design for the feature to implement advanced locking capabilities in vector.

dsmith3197 commented 8 months ago

I was able to verify that this occurs when removing and then re-adding a sink with a disk buffer and was able to reproduce the behavior through the following sequence:

  1. Start Vector with the following config. VECTOR_SELF_NODE_NAME=abc VECTOR_SELF_POD_NAME=def vector --config config.json
  2. Rename destination/cluster/loki-ebapp to destination/cluster/loki-ebapp-1 in the config.
  3. Reload the config kill -HUP $(pgrep vector).
  4. Rename destination/cluster/loki-ebapp-1 to destination/cluster/loki-ebapp in the config.
  5. Reload the config kill -HUP $(pgrep vector).

This results in

2023-12-12T16:56:04.063075Z ERROR vector::topology::builder: Configuration error. error=Sink "destination/cluster/loki-ebapp-1": error occurred when building buffer: failed to build individual stage 0: failed to load/create ledger: failed to lock buffer.lock; is another Vector process running and using this buffer?

Notably, this behavior occurs when the sink is loki (and also clickhouse) but not when the sink is blackhole.

{
  "data_dir": "./vector-data",
  "sources": {
    "cluster_logging_config/ebapp:ebapp": {
      "type": "demo_logs",
      "format": "json"
    }
  },
  "sinks": {
    "destination/cluster/loki-ebapp": {
      "type": "loki",
      "inputs": [
        "cluster_logging_config/ebapp:ebapp"
      ],
      "healthcheck": {
        "enabled": false
      },
      "buffer": {
        "max_size": 268435488,
        "type": "disk",
        "when_full": "drop_newest"
      },
      "encoding": {
        "only_fields": [
          "message"
        ],
        "codec": "text",
        "timestamp_format": "rfc3339"
      },
      "endpoint": "http://loki.d8-monitoring:3100",
      "tls": {
        "verify_hostname": true,
        "verify_certificate": true
      },
      "labels": {
        "container": "{{ container }}",
        "host": "{{ host }}",
        "image": "{{ image }}",
        "namespace": "{{ namespace }}",
        "node": "{{ node }}",
        "pod": "{{ pod }}",
        "pod_ip": "{{ pod_ip }}",
        "pod_labels_*": "{{ pod_labels }}",
        "pod_owner": "{{ pod_owner }}",
        "stream": "{{ stream }}"
      },
      "remove_label_fields": true,
      "out_of_order_action": "rewrite_timestamp"
    }
  }
}

This is a bug that we should resolve.

dsmith3197 commented 8 months ago

We would likely want to make it work somewhat more like a PID file, where any Vector process that wants to take the lock will confirm that, if the lock file already exists, that the PID referenced within it is still running... otherwise, it would delete the lock file and take it over.

Regarding this approach, the fslock crate leverages flock (ref) which automatically releases the lock when all file descriptors are dropped. As such, our implementation already handles non-graceful shutdowns.

nabokihms commented 7 months ago

@dsmith3197 thanks for the info. Do you have any ideas why this problem occurs when the sink is Loki?

jszwedko commented 7 months ago

@dsmith3197 thanks for the info. Do you have any ideas why this problem occurs when the sink is Loki?

What we were able to reproduce is that Vector doesn't correctly release disk buffer locks during reload when components are removed so that if they are re-added it'll fail to lock. Does that match how you ran into this issue? Dropping a component, reloading, readding a component with the same name, reloading?

nabokihms commented 7 months ago

In my case a sidekick application composes a config file for vector and reloads it. A sink with the same id can removed and added multiple times, so yes. I think this is my case. We rely much on reloading.

jszwedko commented 7 months ago

In my case a sidekick application composes a config file for vector and reloads it. A sink with the same id can removed and added multiple times, so yes. I think this is my case. We rely much on reloading.

👍 thanks for confirming! I don't think it is specific to the loki sink then. You'd see the same error for any sinks using disk buffers if they are removed and re-added via vector reload.