influxdata / rskafka

A minimal Rust client for Apache Kafka
Apache License 2.0
292 stars 33 forks source link

Detect an unavailable connection #212

Closed toondaey closed 1 year ago

toondaey commented 1 year ago

I'm trying to create an situation where the user doesn't have a user doesn't have a kafka instance or provides the wrong broker connection using the code in the sample test code in the readme. However each time I run the code, it get's stuck on this line let client = ClientBuilder::new(vec![connection]).build().await.unwrap();. I'm not sure if this has something to do with the backoff configuration with the BrokerConnector::new method. I know this may seem a silly question but I'm still trying to figure out my way around rust too so I'd really appreciate some assistance with this.

crepererum commented 1 year ago

The system is currently designed to eventually converge and retries forever. We could probably add a deadline to the backoff system so user can opt out of that.

toondaey commented 1 year ago

I get it now. Is there a reason why Backoff or BackoffConfig is not public? Or maybe users can use the ClientBuilder with some of the BackoffConfig options to configure this.

crepererum commented 1 year ago

I think the user should be able to pass a BackoffConfig to ClientBuilder. If that's not already possible, this is likely an oversight / missing API feature.

toondaey commented 1 year ago

Currently, the ClientBuilder doesn't accept BackoffConfig.

toondaey commented 1 year ago

213

Raised a PR for BackoffConfig as an option for ClientBuilder

toondaey commented 1 year ago

What do you think about this implementation for a retry limit

// ...
const DEFAULT_RETRY_LIMIT: u32 = 3;
// ...
pub struct BackoffConfig {
    // ...
    pub retry_limit: u32,
}

impl Default for BackoffConfig {
    fn default() -> Self {
        Self {
            // ...
            retry_limit: DEFAULT_RETRY_LIMIT
        }
    }
}

#[derive(Debug, Error)]
#[allow(missing_copy_implementations)]
pub enum BackoffError {
    #[error("")]
    MaxRetry
}
pub type BackoffResult<T> = Result<T, BackoffError>;

pub struct Backoff {
    // ...
    retry_limit: u32,
}

pub async fn retry_with_backoff<F, F1, B, E>(
        &mut self,
        request_name: &str,
        do_stuff: F,
) -> BackoffResult<B>
    where
        F: (Fn() -> F1) + Send + Sync,
        F1: std::future::Future<Output = ControlFlow<B, ErrorOrThrottle<E>>> + Send,
        E: std::error::Error + Send,
{
        let mut attempt_count = 0;
        loop {
            // ...
            if attempt_count >= self.retry_limit {
                break Err(BackoffError::MaxRetry);
            }
            attempt_count += 1;
            // ...
        }
}
crepererum commented 1 year ago

BTW: the backoff system was originally shared with the IOx code base and there we now have a retry limit for exactly that reason. IIRC we use a deadline time instead of a count because the count can be a bit unintuitive due to the jitter. What's your opinion on using a duration instead of a count?

toondaey commented 1 year ago

Judging by what I've read here, I think I'll try replicating the implementation you have in the IOx code base. My approach may not be the best after all.

crepererum commented 1 year ago

I think we can just copy the code from https://github.com/influxdata/influxdb_iox/tree/main/backoff . License-wise this is OK.