blacksky-algorithms / rsky

An AT Protocol Implementation built in Rust.
https://blackskyweb.xyz
Apache License 2.0
163 stars 8 forks source link

Firehose: Gracefully handle errors when making API calls to feedgen #16

Open rudyfraser opened 2 months ago

rudyfraser commented 2 months ago

During situations where we're backfilling records (passing an older sequence number to the subscribeRepos cursor param) there can be a large amount of messages sent to the feedgen endpoint at once which is responsible for saving the records down to the db.

Example:

Records failed to queue: reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Ipv4(0.0.0.0)), port: None, path: "/queue/likes/create", query: None, fragment: None }, source: hyper::Error(Connect, ConnectError("tcp open error", Os { code: 24, kind: Uncategorized, message: "Too many open files" })) }

This is usually solved by adding an artificial delay in the firehose of at least 5-8ms.

                println!("Connected to {default_subscriber_path:?}.");
                while let Some(Ok(Message::Binary(message))) = socket.next().await {
                    let client = client.clone();
                    tokio::spawn(async move {
                        process(message, &client).await;
                    });
                    thread::sleep(Duration::from_millis(8)); // <----- Artificial delay
                }
            }

The error itself is caused by open too many connections to the feedgen rocket.rs server.

We should solve this problem in someway that doesn't result in messages being lost (backoff and retry, some kind of actual queuing, etc.)