blacksky-algorithms / rsky

An AT Protocol Implementation built in Rust.
https://blackskyweb.xyz
Apache License 2.0
163 stars 8 forks source link

Firehose: Add command line flag for firehose cursor #15

Open rudyfraser opened 2 months ago

rudyfraser commented 2 months ago

Right now the firehose always starts at the current stream but in situations where you want to catch up/backfill records you'd need to manually change the code to add a cursor parameter like so:

#[tokio::main]
async fn main() {
    match dotenvy::dotenv() {
        _ => (),
    };

    let default_subscriber_path =
        env::var("FEEDGEN_SUBSCRIPTION_ENDPOINT").unwrap_or("wss://bsky.social".into());
    let client = reqwest::Client::new();
    loop {
        match tokio_tungstenite::connect_async(
            Url::parse(
                format!(
                    "{}/xrpc/com.atproto.sync.subscribeRepos?cursor=1670190430", // Hardcoded cursor. Should be a command line flag.
                    default_subscriber_path
                )
                .as_str(),
            )
            .unwrap(),
        )
        .await
        {
            Ok((mut socket, _response)) => {
                println!("Connected to {default_subscriber_path:?}.");
                while let Some(Ok(Message::Binary(message))) = socket.next().await {
                    let client = client.clone();
                    tokio::spawn(async move {
                        process(message, &client).await;
                    });
                    thread::sleep(Duration::from_millis(8)); // Artificial delay that may be needed when backfilling records due to related issue
                }
            }
            Err(error) => {
                eprintln!("Error connecting to {default_subscriber_path:?}. Waiting to reconnect: {error:?}");
                thread::sleep(Duration::from_millis(500));
                continue;
            }
        }
    }
}

Instead of hardcoding a cursor param you should be able to use an optional command line flag. If there's no flag, it should default to not including a cursor param.