vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.49k stars 1.53k forks source link

"vector validate" is stuck when running with kafka source and elasticsearch sinks #20367

Open Skunnyk opened 4 months ago

Skunnyk commented 4 months ago

A note for the community

Problem

Hi, first, thank you for your work on vector :)

I have a problem with vector 0.37.1 (debian package, but I can trigger the problem with 0.36 too), with vector validate getting stuck majority of the time. It seems to be related to a kafka source and a elasticsearch sink. If I test other configurations, no problem so far. This may be related to discussions in issue #19333

A minimal configuration which trigger the problem is attached.

11:47:55 root@vector01.infra:/etc/vector# vector validate
√ Loaded ["/etc/vector/vector.yaml"]
√ Component configuration
√ Health check "es" <--- # stuck indefinitely
^C^Z <--- # no way to ctrl+c/stop, only kill -9 works
[2]+  Stopped                 vector validate

Sometimes it works:

11:47:57 root@vector01.infra:/etc/vector# vector validate
√ Loaded ["/etc/vector/vector.yaml"]
√ Component configuration
√ Health check "es"
------------------------------------
                           Validated

I can see an open kafka connection to port 9092 when the validate is stuck. The offset=Invalid metadata="" in DEBUG is strange. The kafka cluster works correctly.

Thank you,

Configuration

sources:
  kafka:
    type: kafka
    bootstrap_servers: "broker1:9092,broker2:9092"
    group_id: vector
    topics:
      - syslog-vector

sinks:
  es:
    type: "elasticsearch"
    inputs: ["kafka"]
    endpoints:
      - "http://elasticsearch:9200/"
    mode: bulk
    bulk:
      index: "vector-%Y-%m-%d"

api:
  enabled: true
  address: "127.0.0.1:8686"

Version

0.37.1

Debug Output

2024-04-24T09:54:20.667711Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10                              
2024-04-24T09:54:20.667758Z  INFO vector::app: Log level is enabled. level="trace"                                                                                                                                   
2024-04-24T09:54:20.667838Z DEBUG vector::app: messaged="Building runtime." worker_threads=4                                                                                                                         
2024-04-24T09:54:20.667933Z TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE    
√ Loaded ["/etc/vector/vector.yaml"]                                                                      
2024-04-24T09:54:20.672046Z DEBUG vector::topology::builder: Building new source. component=kafka                                                                                                                    
2024-04-24T09:54:20.673311Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::client: Create new librdkafka client 0x7f6489341400    
2024-04-24T09:54:20.673384Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroying topic partition list: 0x7f64895f0530                                            
2024-04-24T09:54:20.673399Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroyed topic partition list: 0x7f64895f0530
2024-04-24T09:54:20.673425Z DEBUG vector::topology::builder: Building new sink. component=es
2024-04-24T09:54:20.675601Z TRACE rdkafka::consumer::stream_consumer: Starting stream consumer wake loop: 0x7f6489341400
2024-04-24T09:54:20.681276Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T09:54:20.687608Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T09:54:20.693947Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/ method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]                        
2024-04-24T09:54:20.694011Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch9200)
2024-04-24T09:54:20.694068Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"),port=Some(Port(9200))                                                                                                                                           2024-04-24T09:54:20.694949Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T09:54:20.697128Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connecting to 10.20.0.100:9200
2024-04-24T09:54:20.697190Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: mio::poll: registering event source with poller: token=Token(140069773503104), interests=READABLE | 
WRITABLE                                                                                                  
2024-04-24T09:54:20.697606Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T09:54:20.697634Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::conn: client handshake Http1
2024-04-24T09:54:20.697650Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T09:54:20.697726Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T09:54:20.697733Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T09:54:20.697802Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T09:54:20.697838Z DEBUG hyper::proto::h1::io: flushed 185 bytes
2024-04-24T09:54:20.697876Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T09:54:20.699867Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T09:54:20.699898Z TRACE hyper::proto::h1::io: received 653 bytes
2024-04-24T09:54:20.699981Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=653
2024-04-24T09:54:20.700021Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T09:54:20.700102Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T09:54:20.700174Z DEBUG hyper::proto::h1::conn: incoming body is content-length (532 bytes)
2024-04-24T09:54:20.700259Z TRACE hyper::proto::h1::decode: decode; state=Length(532)
2024-04-24T09:54:20.700259Z TRACE hyper::proto::h1::decode: decode; state=Length(532)
2024-04-24T09:54:20.700280Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elas
tic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "532"} body=[532 bytes]
2024-04-24T09:54:20.700310Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T09:54:20.700403Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T09:54:20.700458Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.700480Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.700508Z TRACE hyper::client::pool: pool dropped, dropping pooled (("http", elasticsearch:9200))
2024-04-24T09:54:20.700575Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T09:54:20.700601Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T09:54:20.700633Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T09:54:20.700691Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T09:54:20.700713Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T09:54:20.700780Z TRACE mio::poll: deregistering event source from poller    
2024-04-24T09:54:20.701133Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector::sinks::elasticsearch::common: Auto-detected Elasticsearch API version. version=7
2024-04-24T09:54:20.706750Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T09:54:20.712262Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
√ Component configuration
2024-04-24T09:54:20.718159Z TRACE tower::buffer::worker: worker polling for next message
2024-04-24T09:54:20.718204Z TRACE vector::validate: Healthcheck for es starting.
2024-04-24T09:54:20.718317Z DEBUG http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/_cluster/health method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]
2024-04-24T09:54:20.718354Z TRACE http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch:9200)
2024-04-24T09:54:20.718411Z TRACE http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"), port=Some(Port(9200))
2024-04-24T09:54:20.718506Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T09:54:20.719112Z DEBUG http: hyper::client::connect::http: connecting to 10.20.0.100:9200
2024-04-24T09:54:20.719162Z TRACE http: mio::poll: registering event source with poller: token=Token(140069749659520), interests=READABLE | WRITABLE    
2024-04-24T09:54:20.719604Z DEBUG http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T09:54:20.719726Z TRACE http: hyper::client::conn: client handshake Http1
2024-04-24T09:54:20.720184Z TRACE http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T09:54:20.720346Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T09:54:20.720492Z TRACE http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T09:54:20.720646Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T09:54:20.720809Z DEBUG hyper::proto::h1::io: flushed 200 bytes
2024-04-24T09:54:20.720947Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T09:54:20.739522Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T09:54:20.739536Z TRACE hyper::proto::h1::io: received 510 bytes
2024-04-24T09:54:20.739545Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=510
2024-04-24T09:54:20.739555Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T09:54:20.739566Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T09:54:20.739571Z DEBUG hyper::proto::h1::conn: incoming body is content-length (389 bytes)
2024-04-24T09:54:20.739581Z TRACE hyper::proto::h1::decode: decode; state=Length(389)
2024-04-24T09:54:20.739586Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T09:54:20.739591Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T09:54:20.739599Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.739604Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T09:54:20.739616Z TRACE http: hyper::client::pool: put; add idle connection for ("http", elasticsearch:9200)
2024-04-24T09:54:20.739624Z DEBUG http: hyper::client::pool: pooling idle connection for ("http", elasticsearch:9200)
2024-04-24T09:54:20.739642Z DEBUG http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elastic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "389"} body=[389 bytes]
2024-04-24T09:54:20.739670Z  INFO vector::topology::builder: Healthcheck passed.
2024-04-24T09:54:20.739684Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
√ Health check "es"
2024-04-24T09:54:20.739712Z TRACE vector::validate: Healthcheck for es done.
2024-04-24T09:54:20.739764Z TRACE rdkafka::util: Destroying queue: 0x7f64894504c0    
2024-04-24T09:54:20.739776Z TRACE rdkafka::util: Destroyed queue: 0x7f64894504c0    
2024-04-24T09:54:20.739788Z TRACE rdkafka::consumer::base_consumer: Destroying consumer: 0x7f6489341400     
2024-04-24T09:54:20.739815Z TRACE rdkafka::consumer: Running pre-rebalance with Assign(TPL {syslog-vector/0: offset=Invalid metadata="", error=Ok(()); syslog-vector/1: offset=Invalid metadata="", error=Ok(()); sys
log-vector/2: offset=Invalid metadata="", error=Ok(()); syslog-vector/3: offset=Invalid metadata="", error=Ok(()); syslog-vector/4: offset=Invalid metadata="", error=Ok(()); syslog-vector/5: offset=Invalid metadat
a="", error=Ok(()); syslog-vector/6: offset=Invalid metadata="", error=Ok(()); syslog-vector/7: offset=Invalid metadata="", error=Ok(()); syslog-vector/8: offset=Invalid metadata="", error=Ok(()); syslog-vector/9:
 offset=Invalid metadata="", error=Ok(())})    
2024-04-24T09:54:20.739819Z TRACE tower::buffer::worker: worker polling for next message
2024-04-24T09:54:20.739874Z TRACE tower::buffer::worker: buffer already closed
2024-04-24T09:54:20.740630Z TRACE hyper::client::pool: pool closed, canceling idle interval
2024-04-24T09:54:20.740716Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T09:54:20.740825Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T09:54:20.740838Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T09:54:20.740848Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T09:54:20.740875Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T09:54:20.740894Z TRACE mio::poll: deregistering event source from poller  
<stuck here>

Example Data

No response

Additional Context

No response

References

19333

Skunnyk commented 4 months ago

A trace when validate works (once in a while, random behavior), we can see that everything is fine on rdkafka side :thinking:

2024-04-24T10:09:24.856154Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10                              
2024-04-24T10:09:24.856210Z  INFO vector::app: Log level is enabled. level="trace"                                                                                                                                   
2024-04-24T10:09:24.856264Z DEBUG vector::app: messaged="Building runtime." worker_threads=4                                                                                                                         
2024-04-24T10:09:24.856354Z TRACE mio::poll: registering event source with poller: token=Token(1), interests=READABLE    
√ Loaded ["/etc/vector/vector.yaml"]                                                                      
2024-04-24T10:09:24.860493Z DEBUG vector::topology::builder: Building new source. component=kafka                                                                                                                    
2024-04-24T10:09:24.861738Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::client: Create new librdkafka client 0x7f28d6141400    
2024-04-24T10:09:24.861797Z TRACE rdkafka::consumer::stream_consumer: Starting stream consumer wake loop: 0x7f28d6141400    
2024-04-24T10:09:24.861885Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroying topic partition list: 0x7f28d63f0530                           
2024-04-24T10:09:24.861906Z TRACE source{component_kind="source" component_id=kafka component_type=kafka}: rdkafka::util: Destroyed topic partition list: 0x7f28d63f0530    
2024-04-24T10:09:24.861938Z DEBUG vector::topology::builder: Building new sink. component=es                                                                                                                         
2024-04-24T10:09:24.869500Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.877121Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.883296Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/ method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]                        
2024-04-24T10:09:24.883361Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch:9200)   
2024-04-24T10:09:24.883414Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"), port=Some(Port(9200))
2024-04-24T10:09:24.884344Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T10:09:24.886658Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connecting to 10.20.0.100:9200                                        
2024-04-24T10:09:24.886716Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: mio::poll: registering event source with poller: token=Token(139813365213824), interests=READABLE | 
WRITABLE                                                                                                  
2024-04-24T10:09:24.887021Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T10:09:24.887047Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::conn: client handshake Http1
2024-04-24T10:09:24.887061Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T10:09:24.887128Z TRACE sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T10:09:24.887138Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T10:09:24.887201Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T10:09:24.887244Z DEBUG hyper::proto::h1::io: flushed 185 bytes
2024-04-24T10:09:24.887257Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T10:09:24.889167Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T10:09:24.889189Z TRACE hyper::proto::h1::io: received 653 bytes
2024-04-24T10:09:24.889221Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=653
2024-04-24T10:09:24.889241Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T10:09:24.889258Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T10:09:24.889269Z DEBUG hyper::proto::h1::conn: incoming body is content-length (532 bytes)
2024-04-24T10:09:24.889296Z TRACE hyper::proto::h1::decode: decode; state=Length(532)
2024-04-24T10:09:24.889346Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T10:09:24.889361Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T10:09:24.889326Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}:http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elastic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "532"} body=[532 bytes]
2024-04-24T10:09:24.889382Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.890196Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.890212Z TRACE hyper::client::pool: pool dropped, dropping pooled (("http", elasticsearch:9200))
2024-04-24T10:09:24.890232Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T10:09:24.890241Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T10:09:24.890249Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T10:09:24.890261Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T10:09:24.890289Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T10:09:24.890304Z TRACE mio::poll: deregistering event source from poller    
2024-04-24T10:09:24.890353Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector::sinks::elasticsearch::common: Auto-detected Elasticsearch API version. version=7
2024-04-24T10:09:24.895920Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.901408Z DEBUG sink{component_kind="sink" component_id=es component_type=elasticsearch}: vector_core::tls::settings: Fetching system root certs.
2024-04-24T10:09:24.907324Z TRACE tower::buffer::worker: worker polling for next message
√ Component configuration
2024-04-24T10:09:24.907394Z TRACE vector::validate: Healthcheck for es starting.
2024-04-24T10:09:24.907435Z DEBUG http: vector::internal_events::http_client: Sending HTTP request. uri=http://elasticsearch:9200/_cluster/health method=GET version=HTTP/1.1 headers={"user-agent": "Vector/0.37.1 (x86_64-unknown-linux-gnu cb6635a 2024-04-09 13:45:06.561412437)", "accept-encoding": "identity"} body=[empty]
2024-04-24T10:09:24.907461Z TRACE http: hyper::client::pool: checkout waiting for idle connection: ("http", elasticsearch:9200)
2024-04-24T10:09:24.907522Z TRACE http: hyper::client::connect::http: Http::connect; scheme=Some("http"), host=Some("elasticsearch"), port=Some(Port(9200))
2024-04-24T10:09:24.907563Z DEBUG hyper::client::connect::dns: resolving host="elasticsearch"
2024-04-24T10:09:24.908195Z DEBUG http: hyper::client::connect::http: connecting to 10.20.0.100:9200
2024-04-24T10:09:24.908243Z TRACE http: mio::poll: registering event source with poller: token=Token(139813332916352), interests=READABLE | WRITABLE    
2024-04-24T10:09:24.908561Z DEBUG http: hyper::client::connect::http: connected to 10.20.0.100:9200
2024-04-24T10:09:24.908581Z TRACE http: hyper::client::conn: client handshake Http1
2024-04-24T10:09:24.908593Z TRACE http: hyper::client::client: handshake complete, spawning background dispatcher task
2024-04-24T10:09:24.908606Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Busy }
2024-04-24T10:09:24.908615Z TRACE http: hyper::client::pool: checkout dropped for ("http", elasticsearch:9200)
2024-04-24T10:09:24.908630Z TRACE encode_headers: hyper::proto::h1::role: Client::encode method=GET, body=None
2024-04-24T10:09:24.908652Z DEBUG hyper::proto::h1::io: flushed 200 bytes
2024-04-24T10:09:24.908658Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: KeepAlive, keep_alive: Busy }
2024-04-24T10:09:24.927680Z TRACE hyper::proto::h1::conn: Conn::read_head
2024-04-24T10:09:24.927697Z TRACE hyper::proto::h1::io: received 510 bytes
2024-04-24T10:09:24.927706Z TRACE parse_headers: hyper::proto::h1::role: Response.parse bytes=510
2024-04-24T10:09:24.927714Z TRACE parse_headers: hyper::proto::h1::role: Response.parse Complete(121)
2024-04-24T10:09:24.927727Z DEBUG hyper::proto::h1::io: parsed 3 headers
2024-04-24T10:09:24.927732Z DEBUG hyper::proto::h1::conn: incoming body is content-length (389 bytes)
2024-04-24T10:09:24.927742Z TRACE hyper::proto::h1::decode: decode; state=Length(389)
2024-04-24T10:09:24.927747Z DEBUG hyper::proto::h1::conn: incoming body completed
2024-04-24T10:09:24.927752Z TRACE hyper::proto::h1::conn: maybe_notify; read_from_io blocked
2024-04-24T10:09:24.927759Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.927765Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
2024-04-24T10:09:24.927776Z TRACE http: hyper::client::pool: put; add idle connection for ("http", elasticsearch:9200)
2024-04-24T10:09:24.927786Z DEBUG http: hyper::client::pool: pooling idle connection for ("http", elasticsearch:9200)
2024-04-24T10:09:24.927802Z DEBUG http: vector::internal_events::http_client: HTTP response. status=200 OK version=HTTP/1.1 headers={"x-elastic-product": "Elasticsearch", "content-type": "application/json; charset=UTF-8", "content-length": "389"} body=[389 bytes]   
2024-04-24T10:09:24.927834Z  INFO vector::topology::builder: Healthcheck passed.
2024-04-24T10:09:24.927846Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Init, writing: Init, keep_alive: Idle }
√ Health check "es"
2024-04-24T10:09:24.927875Z TRACE vector::validate: Healthcheck for es done.
2024-04-24T10:09:24.927917Z TRACE rdkafka::util: Destroying queue: 0x7f28d62504c0    
2024-04-24T10:09:24.927920Z TRACE tower::buffer::worker: worker polling for next message
2024-04-24T10:09:24.927930Z TRACE rdkafka::util: Destroyed queue: 0x7f28d62504c0    
2024-04-24T10:09:24.927939Z TRACE tower::buffer::worker: buffer already closed
2024-04-24T10:09:24.927947Z TRACE rdkafka::consumer::base_consumer: Destroying consumer: 0x7f28d6141400     
2024-04-24T10:09:24.928702Z TRACE hyper::client::pool: pool closed, canceling idle interval
2024-04-24T10:09:24.928720Z TRACE hyper::proto::h1::dispatch: client tx closed
2024-04-24T10:09:24.928725Z TRACE hyper::proto::h1::conn: State::close_read()
2024-04-24T10:09:24.928729Z TRACE hyper::proto::h1::conn: State::close_write()
2024-04-24T10:09:24.928734Z TRACE hyper::proto::h1::conn: flushed({role=client}): State { reading: Closed, writing: Closed, keep_alive: Disabled }
2024-04-24T10:09:24.928764Z TRACE hyper::proto::h1::conn: shut down IO complete
2024-04-24T10:09:24.928772Z TRACE mio::poll: deregistering event source from poller    
2024-04-24T10:09:24.964436Z TRACE rdkafka::consumer::base_consumer: Consumer destroyed: 0x7f28d6141400    
2024-04-24T10:09:24.964460Z TRACE rdkafka::util: Destroying client: 0x7f28d6141400    
2024-04-24T10:09:24.964984Z TRACE rdkafka::util: Destroyed client: 0x7f28d6141400    
2024-04-24T10:09:24.965053Z TRACE rdkafka::consumer::stream_consumer: Shut down stream consumer wake loop: 0x7f28d6141400    
------------------------------------
                           Validated
Skunnyk commented 4 months ago

I'm a bit speechless. I can't really reproduce the problem on another setup. After investigation, I decided to add latency on the network interface, because intuition We have 0.2ms between vector and our source and sink (yes, good network)

Adding 15ms on vector:

tc qdisc add dev ens192  root netem delay 15ms

And, magic, vector validate works all time. This is, wow. If I remove the delay, it's stuck again.

Can somebody with the problem try this workaround ? :pray:

jszwedko commented 4 months ago

I'm a bit speechless. I can't really reproduce the problem on another setup. After investigation, I decided to add latency on the network interface, because intuition We have 0.2ms between vector and our source and sink (yes, good network)

Adding 15ms on vector:

tc qdisc add dev ens192  root netem delay 15ms

And, magic, vector validate works all time. This is, wow. If I remove the delay, it's stuck again.

Can somebody with the problem try this workaround ? 🙏

Huh, now that is is strange. Are Kafka and Elasticsearch already up and running when Vector starts up?

Skunnyk commented 4 months ago

Yes they are up during validate, and during starts up.

vector validate also test the sources, even if there is no output in the console about that.

Note: If I run vector, everything is OK, it consumes kafka and write to ES.