fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
300 stars 48 forks source link

Connection ist not reestablished after network or service Issue #238

Closed baumerik closed 1 month ago

baumerik commented 10 months ago

Hi Lanayx,

Thanhs for this great library to connect with a Pulsar cluster and make integrating this in .net so easy.

Actually we have some issues while there are connection troubles. Sometimes the pulsar server has issues (eg. broker restart in case of internal memory leaks) or the network connection itself was cuted. the problem is in our case that the consumer didnot restart to receive messages. Currently we have these problem mostly when we work with retry and dead letter topics. At the moment we have no repro and even not all our distributed services working as Pulsar message consumer have the issue at the same time.

So our idea is to get a "more" stable environemt to work with classic health checks and if it fail for a defined timespan we just restart the hole service. Therefor we need a information on the consumer if the connection is still alive. (producer is not so important because there weget an exception) At the moment I see only the option based on LastDisconnectedTimestamp decide if there was an issue.

What I have discovered until now: the internal ConnectionState is Ready while the Connection of the ConnectionPool is in a exceptional state. But every internal exception currently not catchable for me.

Is it possible to get an clear information for the Connection state? Or do you have another option for me based on existing functionatility I didn't know about?

Thanks for helping me Erik

Lanayx commented 10 months ago

Hi! Thank for the kind words

What I have discovered until now: the internal ConnectionState is Ready while the Connection of the ConnectionPool is in a exceptional state. But every internal exception currently not catchable for me.

When I debug such issues, the best friend is logging. Almost all the transitions of all components are logged in debug mode, so you should eventually be able to see the sequence of logs that lead to the situation you described. I've just checked the code and I don't see why this can persist, but I might miss something. Actually it does happen for a short time as a normal flow, e.g. the connection is faulted, then the message is sent to consumer to reconnect and when this messages reaches connection handler it will lead to reconnect process, until then consumer would still be in Ready state, but this should eventually change to Connecting

baumerik commented 10 months ago

Yes, I have increased our logging and so hopefully we get more information next time.

With the change for the ConnectionState that will be a good solution. Please expose this as public member to the consumer and producer interfaces. So we can react on this too and in case of not reconnecting successful just restart the entire pod.

I really want to help you but my F# skills are too bad and even reading / understanding is a challenge for me.

Lanayx commented 10 months ago

Unfortunately I don't think exposing ConnectionState is a good idea - at least because Consumer is an abstraction and connection state is different for partitioned and regular consumers. Also, I'd like to stick to Java API and not to create custom APIs to not support additional documentation and unknown behaviour. The good news is that Java supports something similar - isConnected property. https://pulsar.apache.org/api/client/3.1.x/org/apache/pulsar/client/api/Consumer.html#isConnected(), so we can add it as a separate issue as feature request. While I'm not implementing new features myself anymore (unless some company wants to pay for it using github sponsorship), I'll support you both on architecture and F# sides if you want to implement it. Also there are excellent F# groups in discord, slack and telegram ready to help. I think I have to add architecture document to the repo anyway, so stay tuned :)

Lanayx commented 10 months ago

https://github.com/fsprojects/pulsar-client-dotnet/issues/239 - created the feature issue

Lanayx commented 10 months ago

Added architecture diagram

baumerik commented 10 months ago

Thank you for this architectural diagram. I understand why you want to be aligned with the java version. I will make an attempt to find a solution.

Last evening we had a migration on our Pulsar cluster. And I have some logs for you:

- "producer(54, , -1) ConnectionHandler" Closed connection to "persistent://customer-service-production/address-service/shipping-address-validation-cancelled-v5" Current state "Ready
  clientCnx(72, LogicalAddres Unspecified/pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:6651)" -- Will try again in 100ms 
- "clientCnx(73, LogicalAddres Unspecified/pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:6651)" Connected ProtocolVersion: 19 ServerVersion: "Pulsar Server2.9.2" MaxMessageSize: 5242880
- "clientCnx(73, LogicalAddres Unspecified/pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:6651)" fail request 1762 type Error
- "clientCnx(73, LogicalAddres Unspecified/pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:6651)" CommandError Error: UnknownError. Message: "org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Error while using ZooKeeper -  ledger=266390 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: Error while using ZooKeeper -  ledger=266390 - operation=Failed to open ledger"
- "producer(44, , -1) ConnectionHandler" Could not get connection to "persistent://customer-service-production/address-service/shipping-address-validation-cancelled-v5" Current state "Ready
  clientCnx(73, LogicalAddres Unspecified/pulsar-broker-3.pulsar-broker.pulsar.svc.cluster.local:6651)" -- Will try again in 189ms 
...

There were several attempts to reconnect, but in the end no reconnection attempt was logged and the service did not start working again. I have no explicit logs for the consumer. Do Producer and Consumer share their connection? So yes, there are some log messages from the server, but I expect that the client try to connect until the service were stopped.

I hope it helps you to understand the error.

Regards Erik

Lanayx commented 10 months ago

I wonder whether reconnect attempts stopped. Yes, ClientCnx is shared between producer and consumer and connection tries should continue to happen with an exponential delay

baumerik commented 10 months ago

I changed the title to a more suitable once...

Okay the connection sharing explains the logs a little bit more. But I didn't understand why no new connection is established and the retry feels aborted. After a certain amount of connecting it stays in a failed state and stopped working.but currently I've no detailed logs.

adrianotr commented 1 month ago

Hi! I believe I'm facing the same issue here. It seems to be a problem that affects producers. Consumers seem able to reconnect.

It seems that after a few reconnection attempts, the producer seems to close the connectionHandler and enter a deadlock state. The producer.SendAsync call just hangs forever then.

dbug: PulsarClient[0] producer(0, standalone-0-9, -1) BeginSendMessage
dbug: PulsarClient[0] producer(0, , -1) DefaultBatcher add message to batch, num messages in batch so far is 0
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) CryptoKeyReader not present, encryption not possible
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) sendMessage sequenceId=23
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) Sending message of type Send
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) Got message of type Ping
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) Sending message of type Pong
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) keepalive tick
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) Sending message of type Ping
warn: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) Socket was disconnected normally while reading
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) ChannelInactive
info: PulsarClient[0] Connection backgroundTask LogicalAddres Unspecified/localhost:6650 removed
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) ConnectionClosed
info: PulsarClient[0] producer(0, , -1) ConnectionHandler Closed connection to persistent://public/default/my-message Current state Ready clientCnx(0, LogicalAddres Unspecified/localhost:6650) -- Will try again in 100ms 
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) fail requests
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) keepalive tick
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) readSocket stopped
info: PulsarClient[0] producer(0, standalone-0-9, -1) Message send timed out. Failing 1 messages
fail: Program[0] Producer exception System.TimeoutException: Could not send message to broker within given timeout    at <StartupCode$Pulsar-Client>.$ProducerImpl.SendMessage@789.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/ProducerImpl.fs:line 796    at Program.<>c__DisplayClass0_0.<<<Main>$>g__SendingTask|2>d.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/ConsoleApp1/Program.cs:line 46
info: Program[0] Sending
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) BeginSendMessage
dbug: PulsarClient[0] producer(0, , -1) DefaultBatcher add message to batch, num messages in batch so far is 0
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) CryptoKeyReader not present, encryption not possible
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) sendMessage sequenceId=24
warn: PulsarClient[0] producer(0, standalone-0-9, -1) not connected, skipping send
dbug: PulsarClient[0] producer(0, , -1) ConnectionHandler Starting reconnect to persistent://public/default/my-message
info: PulsarClient[0] Connecting to Broker (LogicalAddres Unspecified/localhost:6650, PhysicalAddress Unspecified/localhost:6650) with maxMessageSize: 5242880
dbug: PulsarClient[0] Socket connecting to Unspecified/localhost:6650
fail: PulsarClient[0] Socket connection failed to Unspecified/localhost:6650 System.Net.Sockets.SocketException (60): Operation timed out    at Pipelines.Sockets.Unofficial.Internal.Throw.Socket(Int32 errorCode) in /_/src/Pipelines.Sockets.Unofficial/Internal/Throw.cs:line 59    at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.<GetResult>g__ThrowSocketException|10_0(SocketError e) in /_/src/Pipelines.Sockets.Unofficial/SocketAwaitableEventArgs.cs:line 86    at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.GetResult() in /_/src/Pipelines.Sockets.Unofficial/SocketAwaitableEventArgs.cs:line 79    at <StartupCode$Pulsar-Client>.$ConnectionPool.clo@41-23.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/ConnectionPool.fs:line 45
warn: PulsarClient[0] producer(0, , -1) ConnectionHandler Error reconnecting to persistent://public/default/my-message Current state Connecting System.Net.Sockets.SocketException (60): Operation timed out    at Pipelines.Sockets.Unofficial.Internal.Throw.Socket(Int32 errorCode) in /_/src/Pipelines.Sockets.Unofficial/Internal/Throw.cs:line 59    at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.<GetResult>g__ThrowSocketException|10_0(SocketError e) in /_/src/Pipelines.Sockets.Unofficial/SocketAwaitableEventArgs.cs:line 86    at Pipelines.Sockets.Unofficial.SocketAwaitableEventArgs.GetResult() in /_/src/Pipelines.Sockets.Unofficial/SocketAwaitableEventArgs.cs:line 79    at <StartupCode$Pulsar-Client>.$ConnectionPool.clo@41-23.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/ConnectionPool.fs:line 45    at Pulsar.Client.Common.Tools.reraize[a](Exception ex) in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Common/Tools.fs:line 67    at <StartupCode$Pulsar-Client>.$ConnectionPool.clo@41-23.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/ConnectionPool.fs:line 52    at <StartupCode$Pulsar-Client>.$ConnectionPool.clo@135-25.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/ConnectionPool.fs:line 138    at <StartupCode$Pulsar-Client>.$BinaryLookupService.FindBroker@66.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/BinaryLookupService.fs:line 69    at <StartupCode$Pulsar-Client>.$ConnectionHandler.Pipe #1 input at line 48@48.MoveNext() in /Users/Adriano.Scheffer/RiderProjects/pulsar-client-dotnet/src/Pulsar.Client/Internal/ConnectionHandler.fs:line 61
dbug: PulsarClient[0] producer(0, standalone-0-9, -1) ConnectionFailed
info: PulsarClient[0] producer(0, standalone-0-9, -1) creation failed
info: PulsarClient[0] producer(0, standalone-0-9, -1) stopped
info: PulsarClient[0] producer(0, , -1) ConnectionHandler ConnectionHandler mailbox has stopped normally
info: PulsarClient[0] producer(0, standalone-0-9, -1) mailbox has stopped normally
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) keepalive tick
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) keepalive tick
dbug: PulsarClient[0] clientCnx(0, LogicalAddres Unspecified/localhost:6650) keepalive tick

Because of the System.Net.Sockets.SocketException (timeout) when getting the broker, the connectionHandler and the producer get closed and enter the deadlock state: https://github.com/fsprojects/pulsar-client-dotnet/blob/aebde9ca177e25e4aae783ba4a8671268a3038b2/src/Pulsar.Client/Internal/ProducerImpl.fs#L583-L590

How I was able to reproduce the issue

I'm on macos, so I use pfctl to add a packet filter and block port 6650 to simulate a network failure.

  1. add block in quick inet proto { tcp, udp } from any to any port 6650 to the end of the file /etc/pf.conf
  2. run sudo pfctl -f /etc/pf.conf to reload the file
  3. run sudo pfctl -e to enable the filter and block port 6650
  4. run sudo pfctl -d to disable the filter and free port 6650

The application:

  1. Create an application that sends messages in a loop with a try/catch block
  2. With the filter disabled, run an application.
  3. Block the port (sudo pfctl -e)
  4. Wait until you see producer(0, standalone-0-9, -1) stopped
  5. Unblock the port (sudo pfctl -d)
  6. The producer is now in a deadlock state

Sample code:

while (true)
{
    try
    {
        logger.LogInformation("Sending");
        await producer.SendAsync("message");
        await Task.Delay(1000);
    }
    catch (Exception e)
    {
        logger.LogError(e, "Producer exception");
    }
}

I appreciate any help here since I don't know Pulsar enough to tell what the correct solution would be.

Lanayx commented 1 month ago

@adrianotr Thank you for the detailed report, logs were very helpful. I've published 3.5.3 with the fix, can you please check? I'm not closing the issue, since initial request was about consumer, not producer, the fix is only for producer (to match consumer behavior)

adrianotr commented 1 month ago

@Lanayx Thank you so much for the quick response. I tested in under the same conditions and I can confirm the producer was able to reconnect now.

PS. Even though the ticket reported a consumer problem, the logs presented https://github.com/fsprojects/pulsar-client-dotnet/issues/238#issuecomment-1815942901 indicated there were producers running. I also initially suspected of consumers, since my consumers were hanging. It was only after isolating things I could identify that in fact my consumer was calling a bad producer to publish a new message. So if @baumerik can confirm this fixes his problem this ticket can be closed.

Again, thank you for the fix!

baumerik commented 1 month ago

Hi, for us it was not so easy to reproduce it in the first step and currently the most of our services running .net6 (yes we have to update it). and additionally we have added some functionality to check the connection in our healthchecks. So if @adrianotr could reproduce it and now said it is fixed I trust you and close this issue.

and special thanks to @Lanayx for providing this package and try to fix everything reported.