streamnative / pulsar-rs

Rust Client library for Apache Pulsar
Other
365 stars 120 forks source link

Consumer gets stuck indefinitely in some cases where Pulsar/ZK restarts #189

Open vmalloc opened 2 years ago

vmalloc commented 2 years ago

Somewhat related to #164, I'm experiencing a similar hang but on the consumer side. Using the pulsar consumer as a Stream, it seems like after certain occasions in which ZK nodes fail (in my case after experiencing errors), the pulsar cluster recovers just fine, but the client becomes stuck, hanging the entire task that called next(). This is especially severe because running try_next in a select! loop like I do actually calls the entire select branch to hang, meaning the reactive loop stops responding to waking futures.

The logs don't say much except for these lines that get repeated several times around the time of the hang:

ERROR pulsar::consumer] Consumer: messages::next: returning Disconnected
INFO  pulsar::connection_manager] Connected n°5991518907389693436 to pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:6650 via proxy pulsar://pulsar-proxy.pulsar.svc in 78ms
ERROR pulsar::connection_manager] could not ping connection -7903802395670970975 to the server at pulsar://pulsar-proxy.pulsar.svc: Disconnected

Restarting the service altogether (causing a complete reconnect) seems to have fixed it completely, so there must be a bug in the reconnection process here I guess....

EDIT: I'm using the latest version of the client library (4.1.1)

vmalloc commented 2 years ago

FYI @Geal , I managed to reproduce this very easily. Having a single producer and a single consumer against a locally running pulsar (I used https://github.com/vmalloc/pulsar-cli to do that):

  1. Start both consumer and producer
  2. Kill the pulsar instance with SIGKILL (may need to try a couple of times before it reproduces
  3. After a very small number of attempts, the consumer emits the following logs:
    [2022-02-15T10:28:06Z DEBUG pulsar::connection] Connecting to pulsar://127.0.0.1: 127.0.0.1:6650
    [2022-02-15T10:28:06Z DEBUG pulsar::consumer] consumer engine stopped: Err(ServiceDiscovery(Connection(Io(Os { code: 61, kind: ConnectionRefused, message: "Connection refused" }))))
    [2022-02-15T10:28:06Z ERROR pulsar::consumer] could not close consumer Some("pulsar-cli")(14550240535650196156) for topic test2: Disconnected

After that, the consumer stream continuously yields None as the message, even though the server recovers pretty quickly.

vmalloc commented 2 years ago

@Geal I'm considering diving into this issue myself to help resolve it. Just making sure - will you be available for PR approval if the need arises?

DonghunLouisLee commented 2 years ago

@vmalloc Hi! I'll be available for PR approval if you can resolve this issue! Thanks