vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.14k stars 1.6k forks source link

`opentelemetry` source: gRPC log writes not being processed #14463

Open char8 opened 2 years ago

char8 commented 2 years ago

A note for the community

Problem

We're trying to ingest logs from the Envoy proxy envoy.access_loggers.open_telemetry extension into vector with the opentelemetry source.

Configuring envoy to output logs directly to vector shows H2 connections and data frames in debug logs, but no log entries reach the sink and we see no errors. Metrics on the Envoy side show bytes written and no errors.

opentelemetry-collector can receive these logs using gRPC, and we can re-export them from there to vectors opentelemetry gRPC source which works fine.

Configuration

[api]
enabled = true

[sources.otel]
type = "opentelemetry"

[sources.otel.grpc]
address = "0.0.0.0:4317"

[sources.otel.http]
address = "0.0.0.0:4318"

[sinks.console]
inputs = ["otel.logs"]
target = "stdout"
type = "console"

[sinks.console.encoding]
codec = "json"

Version

vector 0.25.0 (x86_64-unknown-linux-gnu d498040 2022-09-18)

Debug Output

❯ RUST_BACKTRACE=full VECTOR_LOG=debug ./vector -vvv -c ../../vector.toml                                                                                                                                                                                                                                                                                                  
2022-09-18T20:19:50.023446Z  INFO vector::app: Log level is enabled. level="debug"
2022-09-18T20:19:50.023522Z  INFO vector::app: Loading configs. paths=["../../vector.toml"]
2022-09-18T20:19:50.024195Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2022-09-18T20:19:50.024608Z DEBUG vector::topology::builder: Building new source. component=otel
2022-09-18T20:19:50.025015Z DEBUG vector::topology::builder: Building new sink component=console
2022-09-18T20:19:50.025111Z  INFO vector::topology::running: Running healthchecks.
2022-09-18T20:19:50.025125Z DEBUG vector::topology::running: Connecting changed/added component(s).
2022-09-18T20:19:50.025142Z DEBUG vector::topology::running: Configuring outputs for source. component=otel
2022-09-18T20:19:50.025174Z DEBUG vector::topology::running: Configuring output for component. component=otel output_id=Some("logs")
2022-09-18T20:19:50.025189Z DEBUG vector::topology::running: Connecting inputs for sink. component=console
2022-09-18T20:19:50.025203Z DEBUG vector::topology::running: Adding component input to fanout. component=console fanout_id=otel.logs
2022-09-18T20:19:50.025208Z  INFO vector::topology::builder: Healthcheck: Passed.
2022-09-18T20:19:50.025233Z DEBUG vector::topology::running: Spawning new source. key=otel
2022-09-18T20:19:50.025289Z DEBUG source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: vector::topology::builder: Source pump starting.
2022-09-18T20:19:50.025335Z  INFO vector: Vector has started. debug="false" version="0.25.0" arch="x86_64" build_id="d498040 2022-09-18"
2022-09-18T20:19:50.025356Z  INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground
2022-09-18T20:19:50.025406Z  INFO source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: vector::sources::util::grpc: Building gRPC server. address=0.0.0.0:4317
2022-09-18T20:19:50.025573Z  INFO source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: vector::sources::opentelemetry::http: Building HTTP server. address=0.0.0.0:4318
2022-09-18T20:19:50.026530Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.025099934072720376
2022-09-18T20:19:52.118279Z DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384, max_header_list_size: 16777216 }
2022-09-18T20:19:52.118441Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x0), header_table_size: 4096, enable_push: 0, max_concurrent_streams: 2147483647, initial_window_size: 268435456, enable_connect_protocol: 0 }
2022-09-18T20:19:52.118459Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Settings { flags: (0x1: ACK) }
2022-09-18T20:19:52.118466Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 268369921 }
2022-09-18T20:19:52.118515Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
2022-09-18T20:19:52.118543Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
2022-09-18T20:19:52.118551Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x1: ACK) }
2022-09-18T20:19:52.118557Z DEBUG Connection{peer=Server}: h2::proto::settings: received settings ACK; applying Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384, max_header_list_size: 16777216 }
2022-09-18T20:19:52.118569Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 983041 }
2022-09-18T20:19:53.628788Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
2022-09-18T20:19:55.027120Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.0025100078055733882
2022-09-18T20:19:55.580393Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }

Example Data

Currently sending a fixed body {"foo": "baz"} with each log event. This is what the event looks like when logged by opentelemetry-collector.

opentelemetry_logs-otel-1      | 2022-09-18T20:25:02.747Z       info    LogsExporter    {"kind": "exporter", "data_type": "logs", "name": "logging", "#logs": 1}
opentelemetry_logs-otel-1      | 2022-09-18T20:25:02.747Z       info    ResourceLog #0
opentelemetry_logs-otel-1      | Resource SchemaURL: 
opentelemetry_logs-otel-1      | Resource labels:
opentelemetry_logs-otel-1      |      -> log_name: STRING(access_logs)
opentelemetry_logs-otel-1      |      -> zone_name: STRING()
opentelemetry_logs-otel-1      |      -> cluster_name: STRING()
opentelemetry_logs-otel-1      |      -> node_name: STRING()
opentelemetry_logs-otel-1      |      -> region: STRING(foo)
opentelemetry_logs-otel-1      | ScopeLogs #0
opentelemetry_logs-otel-1      | ScopeLogs SchemaURL: 
opentelemetry_logs-otel-1      | InstrumentationScope  
opentelemetry_logs-otel-1      | LogRecord #0
opentelemetry_logs-otel-1      | ObservedTimestamp: 1970-01-01 00:00:00 +0000 UTC
opentelemetry_logs-otel-1      | Timestamp: 2022-09-18 20:25:02.742701 +0000 UTC
opentelemetry_logs-otel-1      | Severity: 
opentelemetry_logs-otel-1      | Body: {
opentelemetry_logs-otel-1      |      -> foo: STRING(baz)
opentelemetry_logs-otel-1      | }
opentelemetry_logs-otel-1      | Trace ID: 
opentelemetry_logs-otel-1      | Span ID: 
opentelemetry_logs-otel-1      | Flags: 0
opentelemetry_logs-otel-1      |        {"kind": "exporter", "data_type": "logs", "name": "logging"}

When vector ingests these from opentelemetry-collector, the vector event written to the sink looks like:

{"dropped_attributes_count":0,"message":{"foo":"baz"},"observed_timestamp":"2022-09-18T20:25:02.748308562Z","resources":{"cluster_name":"","log_name":"access_logs","node_name":"","region":"foo","zone_name":""},"source_type":"opentelemetry","timestamp":"2022-09-18T20:25:02.742701Z"}                                                                                 

Additional Context

Ideally we'd like to avoid running opentelemetry-collector in front of vector. Is there any further steps we can take to debug why vector isn't ingesting logs / not showing any errors when Envoy is directly sending logs to it?

To get it to work, this is the opentelemetry-collector config we're using

receivers:
  otlp:
    protocols:
      grpc:
      http:

exporters:
  logging:
    loglevel: debug
  otlp:
    endpoint: vector:4317
    tls:
      insecure: true

service:
  pipelines:
    logs:
      receivers: [otlp]
      exporters: [logging,otlp]

References

No response

caibirdme commented 2 years ago

What's the opentelemetry-proto the envoy extension is using? There's a breaking change in opentelemetry-proto, and vector is using the latest version, so this Maybe caused by proto mismatch. While Opentelemetry-collector itself doesn't use that deprecated field, so vector can recognize its output

char8 commented 2 years ago

What's the opentelemetry-proto the envoy extension is using? There's a breaking change in opentelemetry-proto, and vector is using the latest version, so this Maybe caused by proto mismatch. While Opentelemetry-collector itself doesn't use that deprecated field, so vector can recognize its output

Thanks! - it does indeed look to be this. The patch for Envoy (https://github.com/envoyproxy/envoy/pull/22694) to stop using instrumentation_library_logs didn't make it into a release yet. I'm building Envoy off the main branch, will test and confirm that it works and resolve this issue.

spencergilbert commented 2 years ago

We need to document the version support (I'll check if there's an issue), and see what we can do to improve backward compatibility 👍

char8 commented 2 years ago

I tested against envoyproxy/envoy-dev:latest, same behaviour. I double-checked and it's using the 0.19 protos.

There's another oddity where sending SIGTERM to vector doesn't terminate the otel component immediately. Not sure what its waiting for though. That problem doesn't happen when we're writing from the otel collector.

The resulting vector debug logs are similar to before. Each envoy log event shows up in the vector log as:

2022-09-20T08:26:33.773745Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }

log extract

2022-09-20T08:26:20.213913Z DEBUG source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: vector::topology::builder: Source pump starting.
2022-09-20T08:26:20.213977Z  INFO vector: Vector has started. debug="false" version="0.25.0" arch="x86_64" build_id="a3de633 2022-09-20"
2022-09-20T08:26:20.213988Z  INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground
2022-09-20T08:26:20.213972Z  INFO source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: vector::sources::util::grpc: Building gRPC server. address=0.0.0.0:4317
2022-09-20T08:26:20.214067Z DEBUG source{component_kind="source" component_id=internal_metrics component_type=internal_metrics component_name=internal_metrics}: vector::topology::builder: Source pump starting.
2022-09-20T08:26:20.214095Z  INFO source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: vector::sources::opentelemetry::http: Building HTTP server. address=0.0.0.0:4318
2022-09-20T08:26:20.214863Z DEBUG sink{component_kind="sink" component_id=metrics component_type=prometheus_exporter component_name=metrics}: vector::utilization: utilization=0.04758076766565722
2022-09-20T08:26:20.214876Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.0731003264962824
2022-09-20T08:26:25.215151Z DEBUG sink{component_kind="sink" component_id=metrics component_type=prometheus_exporter component_name=metrics}: vector::utilization: utilization=0.004824934719655026
2022-09-20T08:26:25.215155Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.007310052448541947
2022-09-20T08:26:25.270990Z DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384, max_header_list_size: 16777216 }
2022-09-20T08:26:25.271161Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x0) }
2022-09-20T08:26:25.271173Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Settings { flags: (0x1: ACK) }
2022-09-20T08:26:25.271184Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x1: ACK) }
2022-09-20T08:26:25.271193Z DEBUG Connection{peer=Server}: h2::proto::settings: received settings ACK; applying Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384, max_header_list_size: 16777216 }
2022-09-20T08:26:25.271213Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 983041 }
2022-09-20T08:26:28.348123Z DEBUG h2::codec::framed_write: send frame=Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384, max_header_list_size: 16777216 }
2022-09-20T08:26:28.348218Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x0), header_table_size: 4096, enable_push: 0, max_concurrent_streams: 2147483647, initial_window_size: 268435456, enable_connect_protocol: 0 }
2022-09-20T08:26:28.348227Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Settings { flags: (0x1: ACK) }
2022-09-20T08:26:28.348233Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 268369921 }
2022-09-20T08:26:28.348278Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
2022-09-20T08:26:28.348300Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
2022-09-20T08:26:28.348308Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Settings { flags: (0x1: ACK) }
2022-09-20T08:26:28.348313Z DEBUG Connection{peer=Server}: h2::proto::settings: received settings ACK; applying Settings { flags: (0x0), initial_window_size: 1048576, max_frame_size: 16384, max_header_list_size: 16777216 }
2022-09-20T08:26:28.348319Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 983041 }
2022-09-20T08:26:30.215086Z DEBUG sink{component_kind="sink" component_id=metrics component_type=prometheus_exporter component_name=metrics}: vector::utilization: utilization=0.0005209354885902537
2022-09-20T08:26:30.215169Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.0007310234247616739
2022-09-20T08:26:30.336839Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
2022-09-20T08:26:33.773745Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
2022-09-20T08:26:35.215909Z DEBUG sink{component_kind="sink" component_id=metrics component_type=prometheus_exporter component_name=metrics}: vector::utilization: utilization=0.0001327414976254911
2022-09-20T08:26:35.215980Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.00007311853987349076
2022-09-20T08:26:35.391339Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }
2022-09-20T08:26:36.846455Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Data { stream_id: StreamId(1) }

>> sent SIGINT to vector

2022-09-20T08:26:38.467370Z  INFO vector: Vector has stopped.
2022-09-20T08:26:38.467464Z DEBUG source{component_kind="source" component_id=internal_metrics component_type=internal_metrics component_name=internal_metrics}: vector::topology::builder: Finished.
2022-09-20T08:26:38.467494Z DEBUG source{component_kind="source" component_id=internal_metrics component_type=internal_metrics component_name=internal_metrics}: vector::topology::builder: Source pump finished.
2022-09-20T08:26:38.467534Z DEBUG source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}: hyper::server::shutdown: signal received, starting graceful shutdown
2022-09-20T08:26:38.467543Z DEBUG sink{component_kind="sink" component_id=metrics component_type=prometheus_exporter component_name=metrics}: vector::topology::builder: Finished.
2022-09-20T08:26:38.467582Z DEBUG sink{component_kind="sink" component_id=metrics component_type=prometheus_exporter component_name=metrics}: hyper::server::shutdown: signal received, starting graceful shutdown
2022-09-20T08:26:38.467593Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(2147483647) }
2022-09-20T08:26:38.467609Z DEBUG source{component_kind="source" component_id=otel component_type=opentelemetry component_name=otel}:Server::serve_incoming_with_graceful_shutdown: hyper::server::shutdown: signal received, starting graceful shutdown
2022-09-20T08:26:38.467613Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Ping { ack: false, payload: [11, 123, 162, 240, 139, 155, 254, 84] }
2022-09-20T08:26:38.467645Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(2147483647) }
2022-09-20T08:26:38.467659Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Ping { ack: false, payload: [11, 123, 162, 240, 139, 155, 254, 84] }
2022-09-20T08:26:38.467875Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=Ping { ack: true, payload: [11, 123, 162, 240, 139, 155, 254, 84] }
2022-09-20T08:26:38.467908Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=GoAway { error_code: NO_ERROR, last_stream_id: StreamId(1) }
2022-09-20T08:26:38.469161Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="otel, console" time_remaining="59 seconds left"
2022-09-20T08:26:40.215146Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.000007328056690628141
2022-09-20T08:26:43.469175Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="otel, console" time_remaining="54 seconds left"
2022-09-20T08:26:45.215520Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.0000007473845769437384
2022-09-20T08:26:48.469190Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="otel, console" time_remaining="49 seconds left"
2022-09-20T08:26:50.215532Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.00000009471841318445308
2022-09-20T08:26:53.469667Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="otel, console" time_remaining="44 seconds left"
?2022-09-20T08:26:55.215562Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.000000023871753381202232
2022-09-20T08:26:58.469196Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="otel, console" time_remaining="39 seconds left"
2022-09-20T08:27:00.215064Z DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.000000018768807089225854
char8 commented 2 years ago

Just checked the gRPC requests with wireshark in both cases. Nothing stands out, except that the otel collector exports gzipped logs and populates a few fields whereas envoy does not.

Envoy to Vector

Screenshot 2022-09-20 at 18 20 09

Screenshot 2022-09-20 at 18 15 10

Open Telemetry collector to Vector

Screenshot 2022-09-20 at 18 20 16 Screenshot 2022-09-20 at 18 20 43

caibirdme commented 2 years ago

@char8 Vector is using gzip to ingest otel log https://github.com/vectordotdev/vector/blob/master/src/sources/opentelemetry/mod.rs#L110, but maybe try to make envoy gzip the log?

spencergilbert commented 1 year ago

Another user has run into this as well, and shared the following Envoy config to help us reproduce/troubleshoot.

EDIT: this config has some whitespace errors that need some resolving

!ignore parameters:
  - &access_log_address {address: vector, port_value: 8001}
static_resources:
  listeners:  
  - name: my_endpoint
    address:
      socker_address:
        address: 0.0.0.0
        port_value: 80
    filter_chain:
      filter_chain_match:
          destination_port: 80
      filters:
      - name: envoy.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          access_log:
          - name: envoy.access_loggers.open_telemetry
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.access_loggers.open_telemetry.v3.OpenTelemetryAccessLogConfig
              common_config:
                log_name: https_ingress_logs
                grpc_service:
                  timeout: 5s
                  envoy_grpc:
                    cluster_name: access_log
                    transport_api_version: v3
               body:
                 kvlist_value:
                   values:
                     - key: "bytes_read_count"
                       value: 
                       string_value: "%DOWNSTREAM_WIRE_BYTES_SENT%"

          route_config:
            name: local_route
            virtual_hosts:
            - name: my_local_server
              require_tls: NONE
              domains:
              - localhost*
              - localhost

              routes:
              - match:
                  prefix: "/"
                route:
                  cluster: my_local_http
                  timeout: 1s
  clusters:
    - name: access_log
      connect_timeout: 5s
      per_connection_buffer_limit_bytes: 32768
      type: STRICT_DNS
      typed_extension_protocol_options:
        envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
          "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
          common_http_protocol_options:
            idle_timeout: 50s
          explicit_http_config:
            http2_protocol_options: {} # lulzy but this instructs envoy to use http2 for grpc
       circuit_breakers: *circuit_breakers
       load_assignment:
         cluster_name: access_log
         endpoints:
           - lb_endpoints:
               endpoint:
                 address:
                   socket_address: *access_log_address
    - name: my_local_http
      connect_timeout: 5s
      type: STRICT_DNS
      load_assignment:
        cluster_name: my_local_http
          endpoints:
            - lb_endpoints:
                endpoint:
                  address:
                    socket_address: 
                      address: localhost
                      port_value: 80
ADustyOldMuffin commented 1 year ago

@char8 Vector is using gzip to ingest otel log https://github.com/vectordotdev/vector/blob/master/src/sources/opentelemetry/mod.rs#L110, but maybe try to make envoy gzip the log?

This shouldn't matter as tonic will check if the request is compressed, using accept_compressed more or less just means that it can be compressed I believe.

spencergilbert commented 1 year ago

This shouldn't matter as tonic will check if the request is compressed, using accept_compressed more or less just means that it can be compressed I believe.

That is also my understanding - I would be surprised if that was the cause

ADustyOldMuffin commented 1 year ago

So after troubleshooting a bit, It seems the request is making it to vector and even makes it so far to be decompressed. When running through https://github.com/vectordotdev/vector/blob/c59d71cd76efa18807f772da4520208b4b5898f4/src/sources/util/grpc/decompression.rs#L223-L229 this code it seems to call send message and then never return (not sure whats on the other end of the destination). From a TCP dump I can see Envoy sending the logs, and Vector acking the request back but then nothing comes of it.

I suspect Tonic doesn't believe the request is finished and is waiting for more, but I have no proof of this (and also don't know where to look).