vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.2k stars 1.6k forks source link

feat(opentelemetry sink): new sink #21866

Open pront opened 13 hours ago

pront commented 13 hours ago

Summary

This PR introduces basic support for pushing data to OTEL.

TODO for this PR:

There are numerous potential improvements:

Change Type

Is this a breaking change?

How did you test this PR?

Vector Config

sources:
  generate_syslog:
    type: "demo_logs"
    format: "syslog"
    count: 100000
    interval: 1

transforms:
  remap_syslog:
    inputs: ["generate_syslog"]
    type: "remap"
    source: |
      structured = parse_syslog!(.message)
      .timestamp_nanos = to_unix_timestamp!(structured.timestamp, unit: "nanoseconds")
      .body = structured
      .service_name = structured.appname
      .resource_attributes.source_type = .source_type
      .resource_attributes.host.hostname = structured.hostname
      .resource_attributes.service.name = structured.appname
      .attributes.syslog.procid = structured.procid
      .attributes.syslog.facility = structured.facility
      .attributes.syslog.version = structured.version
      .severity_text = if includes(["emerg", "err", "crit", "alert"], structured.severity) {
        "ERROR"
      } else if structured.severity == "warning" {
        "WARN"
      } else if structured.severity == "debug" {
        "DEBUG"
      } else if includes(["info", "notice"], structured.severity) {
        "INFO"
      } else {
       structured.severity
      }
      .scope_name = structured.msgid
      del(.message)
      del(.timestamp)
      del(.service)
      del(.source_type)

sinks:
 emit_syslog:
   inputs: ["remap_syslog"]
   type: opentelemetry
   protocol:
     type: http
     uri: http://localhost:5318/v1/logs
     method: post
     encoding:
       codec: json
     framing:
       method: newline_delimited
     headers:
      content-type: application/json

# console:
#   type: console
#   inputs: ["remap_syslog"]
#   encoding:
#     codec: json

Run Vector:

VECTOR_LOG=debug cargo run --color=always --profile dev -- --config /Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml

OTEL config:

receivers:
  otlp:
    protocols:
      http:
        endpoint: "0.0.0.0:5318" # from python generator

exporters:
  debug:
  otlp:
    endpoint: localhost:4317
    tls:
      insecure: true

processors:
  batch: {} # Batch processor to optimize log export

service:
  pipelines:
    logs:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug]

Run OTEL collector:

 ./otelcol --config ./otel-collector-config.yaml

Sample output:

/Users/pavlos.rontidis/.cargo/bin/cargo run --color=always --profile dev -- --config /Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.33s
     Running `target/debug/vector --config /Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml`
2024-11-21T21:08:02.826597Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2024-11-21T21:08:02.826938Z  INFO vector::app: Log level is enabled. level="debug"
2024-11-21T21:08:02.827248Z DEBUG vector::app: messaged="Building runtime." worker_threads=10
2024-11-21T21:08:02.831422Z  INFO vector::app: Loading configs. paths=["/Users/pavlos.rontidis/CLionProjects/vector/pront/otel/otel-sink-test.yaml"]
2024-11-21T21:08:02.838985Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2024-11-21T21:08:02.881344Z DEBUG vector::topology::builder: Building new source. component=generate_syslog
2024-11-21T21:08:02.883040Z DEBUG vector::topology::builder: Building new transform. component=remap_syslog
2024-11-21T21:08:02.919678Z DEBUG vector::topology::builder: Building new sink. component=emit_syslog
2024-11-21T21:08:02.923264Z  WARN sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector::sinks::util::http: Option `headers` has been deprecated. Use `request.headers` instead.
2024-11-21T21:08:02.976490Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector_core::tls::settings: Fetching system root certs.
2024-11-21T21:08:03.135270Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector_core::tls::settings: Fetching system root certs.
2024-11-21T21:08:03.275055Z  INFO vector::topology::running: Running healthchecks.
2024-11-21T21:08:03.275276Z DEBUG vector::topology::running: Connecting changed/added component(s).
2024-11-21T21:08:03.275409Z DEBUG vector::topology::running: Configuring outputs for source. component=generate_syslog
2024-11-21T21:08:03.275445Z DEBUG vector::topology::running: Configuring output for component. component=generate_syslog output_id=None
2024-11-21T21:08:03.275478Z DEBUG vector::topology::running: Configuring outputs for transform. component=remap_syslog
2024-11-21T21:08:03.275507Z DEBUG vector::topology::running: Configuring output for component. component=remap_syslog output_id=None
2024-11-21T21:08:03.275533Z DEBUG vector::topology::running: Connecting inputs for transform. component=remap_syslog
2024-11-21T21:08:03.275623Z DEBUG vector::topology::running: Adding component input to fanout. component=remap_syslog fanout_id=generate_syslog
2024-11-21T21:08:03.275778Z DEBUG vector::topology::running: Connecting inputs for sink. component=emit_syslog
2024-11-21T21:08:03.275811Z DEBUG vector::topology::running: Adding component input to fanout. component=emit_syslog fanout_id=remap_syslog
2024-11-21T21:08:03.275935Z  INFO vector::topology::builder: Healthcheck passed.
2024-11-21T21:08:03.276164Z DEBUG vector::topology::running: Spawning new source. key=generate_syslog
2024-11-21T21:08:03.276844Z DEBUG vector::topology::running: Spawning new transform. key=remap_syslog
2024-11-21T21:08:03.277416Z DEBUG source{component_kind="source" component_id=generate_syslog component_type=demo_logs}: vector::topology::builder: Source starting.
2024-11-21T21:08:03.277474Z DEBUG source{component_kind="source" component_id=generate_syslog component_type=demo_logs}: vector::topology::builder: Source pump supervisor starting.
2024-11-21T21:08:03.277637Z  INFO vector: Vector has started. debug="true" version="0.43.0" arch="aarch64" revision=""
2024-11-21T21:08:03.277575Z DEBUG transform{component_kind="transform" component_id=remap_syslog component_type=remap}: vector::topology::builder: Synchronous transform starting.
2024-11-21T21:08:03.277673Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector::topology::builder: Sink starting.
2024-11-21T21:08:03.278138Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2024-11-21T21:08:03.279620Z DEBUG source{component_kind="source" component_id=generate_syslog component_type=demo_logs}: vector::topology::builder: Source pump starting.
2024-11-21T21:08:03.279748Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}: vector::utilization: utilization=0.9986980710327271
2024-11-21T21:08:04.286928Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://localhost:5318/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.0-custom-a57556da0 (aarch64-apple-darwin debug=full)"} body=[539 bytes]
2024-11-21T21:08:04.287893Z DEBUG hyper::client::connect::dns: resolving host="localhost"
2024-11-21T21:08:04.290689Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: hyper::client::connect::http: connecting to 127.0.0.1:5318
2024-11-21T21:08:04.291385Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: hyper::client::connect::http: connected to 127.0.0.1:5318
2024-11-21T21:08:04.292470Z DEBUG hyper::proto::h1::io: flushed 757 bytes
2024-11-21T21:08:04.295217Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-11-21T21:08:04.295263Z DEBUG hyper::proto::h1::conn: incoming body is content-length (21 bytes)
2024-11-21T21:08:04.295382Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-11-21T21:08:04.295523Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: hyper::client::pool: pooling idle connection for ("http", localhost:5318)
2024-11-21T21:08:04.295706Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=1}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"content-type": "application/json", "date": "Thu, 21 Nov 2024 21:08:04 GMT", "content-length": "21"} body=[21 bytes]
2024-11-21T21:08:05.290103Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://localhost:5318/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.0-custom-a57556da0 (aarch64-apple-darwin debug=full)"} body=[1073 bytes]
2024-11-21T21:08:05.290279Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: hyper::client::pool: reuse idle connection for ("http", localhost:5318)
2024-11-21T21:08:05.290494Z DEBUG hyper::proto::h1::io: flushed 1292 bytes
2024-11-21T21:08:05.291118Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-11-21T21:08:05.291131Z DEBUG hyper::proto::h1::conn: incoming body is content-length (21 bytes)
2024-11-21T21:08:05.291156Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-11-21T21:08:05.291212Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: hyper::client::pool: pooling idle connection for ("http", localhost:5318)
2024-11-21T21:08:05.291273Z DEBUG sink{component_kind="sink" component_id=emit_syslog component_type=opentelemetry}:request{request_id=2}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 

Does this PR include user facing changes?

Checklist

References