fsprojects / pulsar-client-dotnet

Apache Pulsar native client for .NET (C#/F#/VB)
MIT License
301 stars 47 forks source link

Add support for Pulsar cluster level auto failover (PIP-121) #217

Open LeoSht opened 1 year ago

LeoSht commented 1 year ago

Please add support for cluster-level auto failover similar to Java client. Related information: Cluster Level Fail Over PIP-121 Java Client MR

Lanayx commented 1 year ago

Hi! I'm not actively using Pulsar and not adding new functionality to the library myself (only doing bug fixes), but always happy to support if someone wants to add smth, so if you need this feature - you can send a PR

fjod commented 1 year ago

So far I get to the point: in java code when the cluster address changed, they do this:

   pulsarClient.updateServiceUrl(serviceUrl);
   pulsarClient.reloadLookUp();

On update service url:

 log.info("Updating service URL to {}", serviceUrl);
        conf.setServiceUrl(serviceUrl);
        lookup.updateServiceUrl(serviceUrl);
        cnxPool.closeAllConnections();

and on reload lookup:

 if (conf.getServiceUrl().startsWith("http")) {
            lookup = new HttpLookupService(conf, eventLoopGroup);
        } else {
            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
                    externalExecutorProvider.getExecutor());
        }

So far I did same things in pulsarClient

 backgroundTask {
       let q = { config with ServiceAddresses = addresses } 
       do! connectionPool.CloseAsync()       
       connectionPool <- ConnectionPool(q)
       lookupService <- BinaryLookupService(q, connectionPool)       
       }

but all producers are still connected to old address. It looks like in Java code when all connections are closed, producers/consumers will go to new lookup to get new addresses(?), but in F# they do not.

Lanayx commented 1 year ago

I think you need to ensure that once connection is dropped consumers/producers should already have new address for reconnection. In your code you make connection drop first, so probably reconnect happens immediately to the same address

fjod commented 1 year ago

once connection is dropped consumers/producers should already have new address for reconnection

I tried to rewrite like this:

 backgroundTask {
       let q = { config with ServiceAddresses = addresses }
       let oldPool = connectionPool
       connectionPool <- ConnectionPool(q)
       lookupService <- BinaryLookupService(q, connectionPool)
       do! oldPool.CloseAsync()      
       }

Interesting, if I start demo app + 2 clusters on 6650 and 6651 addresses, and a webapp which tells pulsar to swtich from 6650 to 6651 - it works :) But if I start with 6650, then boot up the app to broadcast new address 6651, it fails to reconnect tp 6651 with inner exception

Unhandled exception. System.TimeoutException: Could not send message to broker within given timeout
   at <StartupCode$Pulsar-Client>.$ProducerImpl.SendMessage@760.MoveNext() in F:\work\pulsar-client-dotnet\src\Pulsar.Client\Internal\ProducerImpl.fs:line 767

So, it seems like if there is an established connection to broker, my code fails to change the address. But if there is no connection yet (address is switched before client is connected to broker), it works ok.

Lanayx commented 1 year ago

@fjod I think you can create a PR already so I could try it myself and help