vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.31k stars 1.5k forks source link

Error decoding Snappy compressed Protobuf message from Grafana Agent in HTTP Server Source #20042

Open senkr132 opened 5 months ago

senkr132 commented 5 months ago

A note for the community

Problem

We have some of the sources which are sending logs using Grafana Agent and we would like to use the HTTP server as source to consume the protobuf message and transform it before sending to our log aggregation system (Loki).Tried to decode the incoming message using codec = protobuf but it fails to parse the incoming packet

Configuration

api:
  enabled: true
sources:
  http_server:
    type: http_server
    address: 0.0.0.0:8080
    method: POST
    path: /loki/api/v1/push
    decoding:
      codec: protobuf
      protobuf:
        desc_file: protobuf_descriptor_set.desc
        message_type: logproto.PushRequest
sinks:
  my_sink_id:
    type: file
    inputs:
      - http_server
    path: vector-%Y-%m-%d.log
    encoding:
      codec: text

Version

0.36.0

Debug Output

2024-03-08T22:08:33.849787Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-03-08T22:08:33.850106Z TRACE hyper::proto::h1::conn: flushed({role=server}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-03-08T22:08:33.851324Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-03-08T22:08:33.851910Z TRACE hyper::proto::h1::io: received 328 bytes
2024-03-08T22:08:33.852540Z TRACE parse_headers: hyper::proto::h1::role: Request.parse bytes=328
2024-03-08T22:08:33.853081Z TRACE parse_headers: hyper::proto::h1::role: Request.parse Complete(143)
2024-03-08T22:08:33.853602Z DEBUG hyper::proto::h1::io: parsed 4 headers
2024-03-08T22:08:33.854111Z DEBUG hyper::proto::h1::conn: incoming body is content-length (185 bytes)
2024-03-08T22:08:33.854652Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: vector::internal_events::http: Received HTTP request. internal_log_rate_limit=true
2024-03-08T22:08:33.855231Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::method: method::POST?: POST
2024-03-08T22:08:33.855717Z TRACE hyper::proto::h1::decode: decode; state=Length(185)
2024-03-08T22:08:33.856116Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-03-08T22:08:33.856450Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::path: "loki"?: "loki"
2024-03-08T22:08:33.856799Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::path: "api"?: "api"
2024-03-08T22:08:33.857235Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::path: "v1"?: "v1"
2024-03-08T22:08:33.857570Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::path: "push"?: "push"
2024-03-08T22:08:33.857922Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::header: optional("authorization")
2024-03-08T22:08:33.858309Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::header: optional("content-encoding")
2024-03-08T22:08:33.858647Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::query: route was called without a query string, defaulting to empty
2024-03-08T22:08:33.858982Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: vector::sources::util::http::prelude: Handling HTTP request. headers={"host": "localhost:8080", "user-agent": "GrafanaAgent/", "content-length": "185", "content-type": "application/x-protobuf"}
2024-03-08T22:08:33.859519Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: vector::internal_events::http: Bytes received. byte_size=185 http_path=/loki/api/v1/push protocol=http
2024-03-08T22:08:33.860098Z ERROR source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: vector::internal_events::codecs: Failed deserializing frame. error=Error parsing protobuf: DecodeError { description: "invalid tag value: 0", stack: [] } error_code="decoder_deserialize" error_type="parser_failed" stage="processing" internal_log_rate_limit=true
2024-03-08T22:08:33.860471Z  WARN source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: vector::internal_events::http: Received bad request. error=Failed decoding body: ParsingError(Error parsing protobuf: DecodeError { description: "invalid tag value: 0", stack: [] }) error_code=http_response_400 error_type="request_failed" error_stage="receiving" http_code=400 internal_log_rate_limit=true
2024-03-08T22:08:33.860789Z TRACE source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/loki/api/v1/push}: warp::filters::method: method::GET?: POST
2024-03-08T22:08:33.861155Z TRACE encode_headers: hyper::proto::h1::role: Server::encode status=400, body=Some(Known(149)), req_method=Some(POST)
2024-03-08T22:08:33.861440Z TRACE hyper::proto::h1::encode: sized write, len = 149
2024-03-08T22:08:33.861712Z TRACE hyper::proto::h1::io: buffer.flatten self.len=118 buf.len=149
2024-03-08T22:08:33.862088Z DEBUG hyper::proto::h1::io: flushed 267 bytes
2024-03-08T22:08:33.862293Z TRACE hyper::proto::h1::conn: flushed({role=server}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-03-08T22:08:33.862724Z TRACE hyper::proto::h1::conn: Conn::read_head

Example Data

Just used a simple file in tailing in grafana agent.

Additional Context

Used the proto file from here to generate desc file - https://github.com/grafana/loki/blob/main/pkg/push/push.proto Used the following command to generate the desc file protoc --include_imports -o protobuf_descriptor_set.desc push.proto

References

No response

jszwedko commented 5 months ago

@senkr132 Are you able to decode the incoming payload using protoc --decode and the desc file you created? That would help isolate the issue. If you could share one of the payloads here, too, that would be helpful.

senkr132 commented 5 months ago

@jszwedko - i tried the following approach

image

Also i recorded the payload coming from Grafana Agent by letting it to pass thorough the HTTP source and write it to file using the File sink which uses raw_message codec and native.

My expertise on prtobuf is limited - please let me know if anything else i can try.

Attached the payload written to a file vector-2024-03-08-native.log vector-2024-03-08-raw-message.log

jszwedko commented 5 months ago

Hmm the first failure makes me think that the file isn't valid protobuf which means it may not have passed through without alteration. I might try capturing it in a different way: maybe using netcat.