LemmyNet / activitypub-federation-rust

High-level Rust library for the Activitypub protocol
GNU Affero General Public License v3.0
420 stars 45 forks source link

Remove `actix-rt` and replace with tokio tasks #42

Closed cetra3 closed 1 year ago

cetra3 commented 1 year ago

This removes the background-jobs crate and replaces it with an inline activity queue that spawns tokio tasks to handle the background queue.

I haven't given this a good test yet, as it doesn't appear very easy to benchmark/stress test the activity queue out, so would be interested in thoughts there.

This could be built into something with a bit more persistence in the future & generalise on the task submission.

Resolves https://github.com/LemmyNet/activitypub-federation-rust/issues/38 & https://github.com/LemmyNet/activitypub-federation-rust/issues/32

Nutomic commented 1 year ago

For testing I would try to write a unit test which launches a few thousand jobs, each of which sleeps a while and then increments an atomic int. Then check that the counter has reached the expected value. Plus another test where jobs keep some internal state in order to fail on first run to ensure that retry works.

cetra3 commented 1 year ago

OK, I've wired up a pretty janky test to benchmark what's happening.

I get about 3000msg/s which seems to be consistent regardless of how many workers/messages are in the queue. I've wired this test up to a flame graph I generated with:

cargo flamegraph --unit-test -- test_activity_queue_workers

flamegraph

The flamegraph shows that the overhead from tokio is insignificant. Keeping in mind it's all requests to localhost so no real network traversal. The majority of the time it's spending doing two things:

I had a look through the activitypub spec & the source for mastodon to find out whether we could get away with reusing signed requests. It looks as though there is a bit of slop either side but the window is ~1 hour (if you account for CLOCK_SKEW_MARGIN). If we had tight retry times then we might get away with it, but I think the intent is that the request should be re-signed if it is tried at a later date.

Reading and parsing a PEM file, however, is probably an easy performance win. There are a couple of paths:

I think I can adjust the scope of this PR and try either or.

I think I like the latter option: adjust traits and keep PKey<Private> around in it's decoded form.

Nutomic commented 1 year ago

HTTP signatures expire after 10 seconds which is the default in the http-signature-normalization library. This can be changed on the sending side here, and in fact I dont see any problem with using a higher value. It might even make delivery more reliable. Here is the HTTP signature standard by the way. At least for the first retry (after 60s) the signature could be reused if its not too complicated to implement.

I always prefer stricter typing, so using PKey<Private> instead of string sounds good. This can be done in send_activity() before the loop, so it only needs to be once when sending an activity to many inboxes (a common case), and it doesnt require any API change. Though changing ActorType::private_key_pem() to return PLey<Private> instead of string would arguably be cleaner.

By the way #43 should also be helpful.

cetra3 commented 1 year ago

OK I've adjusted the expiry to be ~5 mins & refactored some of the internals to only parse the PEM data once. This has given a boost to ~20,000 msg/s on my PC which I'm a little happier with. I have kept the actor trait as it is. It can be refactored out later if it's still proving to be a bottleneck.

I've adjusted the test server to fail every now and then to test the retry functionality.

I've also adjusted the activity payload from a string to Bytes so that it's more cheaply cloneable since the payload is actually shared between inboxes etc.. I have also condensed the JSON by not using the pretty printing methods. This should save a few bytes.

In terms of #43, I don't know how 100% correct it could be if you sent an activity to a webserver that doesn't support compression. Normally http compression works by someone making a request with appropriate accept headers, then the return of the request will compress or not based upon what the requestee accepts. In this scenario, we are making a post, so we're the requestee & without making an initial request first to work out what the server accepts, we might find that they don't support compression.

I am cautiously optimistic that the changes I've made should help with performance. However it would need to be tested in more of a "real world" scenario, as the benchmarking I'm doing is rather synthetic.

phiresky commented 1 year ago

@cetra3 could you share what code / command you use for benchmarking now?

phiresky commented 1 year ago

Here's my suggestion (as a git patch). Performance seems mostly unaffected by these changes based on your test_activity_queue_workers benchmark (without dodgy).

diff --git a/Cargo.toml b/Cargo.toml
index 37471ab..c0c77ff 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -86,3 +86,6 @@ path = "examples/local_federation/main.rs"
 [[example]]
 name = "live_federation"
 path = "examples/live_federation/main.rs"
+
+[profile.release]
+lto = "fat" # doesn't seem to make much of a difference (maybe 10%) but still a good idea?
\ No newline at end of file
diff --git a/src/activity_queue.rs b/src/activity_queue.rs
index 817cbf2..e23c899 100644
--- a/src/activity_queue.rs
+++ b/src/activity_queue.rs
@@ -30,8 +30,8 @@ use std::{
     time::{Duration, SystemTime},
 };
 use tokio::{
-    sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
-    task::JoinHandle,
+    sync::mpsc::{unbounded_channel, UnboundedSender},
+    task::{JoinHandle, JoinSet},
 };
 use tracing::{debug, info, warn};
 use url::Url;
@@ -219,11 +219,9 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
 /// Uses an unbounded mpsc queue for communication (i.e, all messages are in memory)
 pub(crate) struct ActivityQueue {
     // Our "background" tasks
-    senders: Vec<UnboundedSender<SendActivityTask>>,
-    handles: Vec<JoinHandle<()>>,
+    sender: UnboundedSender<SendActivityTask>,
+    recv_handle: JoinHandle<()>,
     reset_handle: JoinHandle<()>,
-    // Round robin of the sender list
-    last_sender_idx: AtomicUsize,
     // Stats shared between the queue and workers
     stats: Arc<Stats>,
 }
@@ -251,26 +249,24 @@ struct RetryStrategy {
 async fn worker(
     client: ClientWithMiddleware,
     timeout: Duration,
-    mut receiver: UnboundedReceiver<SendActivityTask>,
+    message: SendActivityTask,
     stats: Arc<Stats>,
     strategy: RetryStrategy,
 ) {
-    while let Some(message) = receiver.recv().await {
-        stats.pending.fetch_sub(1, Ordering::Relaxed);
-        stats.running.fetch_add(1, Ordering::Relaxed);
+    stats.pending.fetch_sub(1, Ordering::Relaxed);
+    stats.running.fetch_add(1, Ordering::Relaxed);

-        let outcome = retry(|| sign_and_send(&message, &client, timeout), strategy).await;
+    let outcome = retry(|| sign_and_send(&message, &client, timeout), strategy).await;

-        // "Running" has finished, check the outcome
-        stats.running.fetch_sub(1, Ordering::Relaxed);
+    // "Running" has finished, check the outcome
+    stats.running.fetch_sub(1, Ordering::Relaxed);

-        match outcome {
-            Ok(_) => {
-                stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
-            }
-            Err(_err) => {
-                stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
-            }
+    match outcome {
+        Ok(_) => {
+            stats.completed_last_hour.fetch_add(1, Ordering::Relaxed);
+        }
+        Err(_err) => {
+            stats.dead_last_hour.fetch_add(1, Ordering::Relaxed);
         }
     }
 }
@@ -282,10 +278,6 @@ impl ActivityQueue {
         timeout: Duration,
         strategy: RetryStrategy,
     ) -> Self {
-        // Keep a vec of senders to send our messages to
-        let mut senders = Vec::with_capacity(worker_count);
-        let mut handles = Vec::with_capacity(worker_count);
-
         let stats: Arc<Stats> = Default::default();

         // This task clears the dead/completed stats every hour
@@ -299,36 +291,42 @@ impl ActivityQueue {
             }
         });

-        // Spawn our workers
-        for _ in 0..worker_count {
-            let (sender, receiver) = unbounded_channel();
-            handles.push(tokio::spawn(worker(
-                client.clone(),
-                timeout,
-                receiver,
-                stats.clone(),
-                strategy,
-            )));
-            senders.push(sender);
-        }
+        let (sender, mut receiver) = unbounded_channel();
+        let stats2 = stats.clone();
+        let recv_handle = tokio::spawn(async move {
+            let mut join_set = JoinSet::new();
+            while let Some(task) = receiver.recv().await {
+                join_set.spawn(worker(
+                    client.clone(),
+                    timeout,
+                    task,
+                    stats2.clone(),
+                    strategy,
+                ));
+                while join_set.len() > worker_count {
+                    // prevent there being more than worker_count running tasks
+                    join_set.join_next().await;
+                }
+            }
+            // make sure all tasks are done during shut down
+            while !join_set.is_empty() {
+                join_set.join_next().await;
+            }
+        });

         Self {
-            senders,
-            handles,
+            sender,
+            recv_handle,
             reset_handle,
-            last_sender_idx: AtomicUsize::new(0),
             stats,
         }
     }
     async fn queue(&self, message: SendActivityTask) -> Result<(), anyhow::Error> {
-        // really basic round-robin to our workers, we just do mod on the len of senders
-        let idx_to_send = self.last_sender_idx.fetch_add(1, Ordering::Relaxed) % self.senders.len();
-
         // Set a queue to pending
         self.stats.pending.fetch_add(1, Ordering::Relaxed);

         // Send to one of our workers
-        self.senders[idx_to_send].send(message)?;
+        self.sender.send(message)?;

         Ok(())
     }
@@ -340,15 +338,13 @@ impl ActivityQueue {
     #[allow(unused)]
     // Drops all the senders and shuts down the workers
     async fn shutdown(self) -> Result<Stats, anyhow::Error> {
-        drop(self.senders);
+        drop(self.sender);

         // stop the reset counter task
         self.reset_handle.abort();
         self.reset_handle.await.ok();

-        for handle in self.handles {
-            handle.await?;
-        }
+        self.recv_handle.await?;

         Arc::try_unwrap(self.stats).map_err(|_| anyhow!("Could not retrieve stats"))
     }
@@ -438,32 +434,54 @@ mod tests {
         }
         Ok(())
     }
+    // This will periodically send back internal errors to test the retry
+    async fn reliable_handler(
+        State(state): State<Arc<AtomicUsize>>,
+        headers: HeaderMap,
+        body: Bytes,
+    ) -> Result<(), StatusCode> {
+        debug!("Headers:{:?}", headers);
+        debug!("Body len:{}", body.len());
+
+        if state.fetch_add(1, Ordering::Relaxed) % 20 == 0 {
+            return Err(StatusCode::INTERNAL_SERVER_ERROR);
+        }
+        Ok(())
+    }

-    async fn test_server() {
+    async fn test_server(dodgy: bool) {
         use axum::{routing::post, Router};

         // We should break every now and then ;)
         let state = Arc::new(AtomicUsize::new(0));
-
-        let app = Router::new()
-            .route("/", post(dodgy_handler))
-            .with_state(state);
-
+        let app = if dodgy {
+            Router::new()
+                .route("/", post(dodgy_handler))
+                .with_state(state)
+        } else {
+            Router::new()
+                .route("/", post(reliable_handler))
+                .with_state(state)
+        };
         axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
             .serve(app.into_make_service())
             .await
             .unwrap();
     }

+    // #[ignore]
+    #[tokio::test(flavor = "multi_thread")]
+    async fn bench_activity_queue_workers() {
+        activity_queue_workers(64, 100_000, false).await
+    }
     #[tokio::test(flavor = "multi_thread")]
-    // Queues 10_000 messages and then asserts that the worker runs them
     async fn test_activity_queue_workers() {
-        let num_workers = 64;
-        let num_messages: usize = 100;
-
-        tokio::spawn(test_server());
+        activity_queue_workers(64, 100, true).await
+    }
+    // Queues 10_000 messages and then asserts that the worker runs them
+    async fn activity_queue_workers(num_workers: usize, num_messages: usize, dodgy: bool) {
+        tokio::spawn(test_server(dodgy));

-        /*
         // uncomment for debug logs & stats
         use tracing::log::LevelFilter;

@@ -472,7 +490,6 @@ mod tests {
             .filter_module("activitypub_federation", LevelFilter::Info)
             .format_timestamp(None)
             .init();
-        */

         let activity_queue = ActivityQueue::new(
             reqwest::Client::default().into(),
cetra3 commented 1 year ago

@phiresky Thanks for your diff! but I couldn't apply it to my branch for some reason. I have made adjustments in the spirit of changes you introduced.

I am doing the following to benchmark:

I've adjusted it to use tokio::spawn directly rather than have the worker tasks hang around & don't really see a dip in performance. I did try that yesterday but was seeing a reduction in throughput. It looks like if there are too many workers there is some bookkeeping getting in the way of performance, but if you set the num_workers to be a low but sensible number, it seems to work OK.

Having said that: if we have tasks that are going to hang around for 60 hours it's going to need to be fairly high since there could be a lot of outstanding requests retrying.

For that reason, I've adjusted it to:

phiresky commented 1 year ago

Great!

Having a separate queue for retries definitely makes sense, though long term they should probably be stored in the DB or a file instead after the first or second failure (or while shutting down).

I think your use of the semaphore is a bit wrong, I'll add an inline comment about that.

phiresky commented 1 year ago

I just realized there's another issue with this whole thing: It replaces the main runtime with the normal tokio runtime, but I think when this library is instantiated from lemmy it has to run in lemmy's runtime which is also actix_rt and can't be switched over because other lemmy code also uses actors?

cetra3 commented 1 year ago

@phiresky You can run this within actix_rt as it uses tokio under the hood. I.e, it will be compatible. However I do have a PR opened that does use tokio::main as I feel it will give better performance, especially with db pools and other things. The only actors lemmy was using was in the websocket stuff that is pulled out of the latest release. I've ran the PR locally without issue, but obviously more testing is required.

Good point on the join_set. The permit was passed down to the child task so the semaphore was working fine, but the join_set would continue to grow larger. I've adjusted the join_set to apply backpressure as per your example, but before the task is spawned.

I do agree there should be some persistence added to the outgoing activity queue at some point, especially for retries.

Nutomic commented 1 year ago

Looks good, thanks! However you didnt remove the worker_count parameter after all, whats the reason for keeping it?

phiresky commented 1 year ago

The worker_count parameter has a different meaning after this PR but it is still needed for two reasons

  1. JoinSet is used. JoinSet has (for some reason) no method to clear all already done items so without a limit it would grow forever. Maybe this could also be fixed by just polling join_next() until it's not ready anymore.
    • joinset is needed to ensure tasks are finished on shutdown. Just found an alternative method for that here
  2. In order to put backpressure somewhere. Right now it's kinda useless because the mpsc queue is unbounded anyways. Longer term, the queue should also be bounded I think and excess items logged and dropped (if it can't send as fast as it's getting new items than letting it queue up more and more doesn't improve the situation)

In summary, I think this should be better than it was before, but I'm pretty sure it's not the optimal solution and I guess none of know how that would look right now ;)

phiresky commented 1 year ago

@cetra3 i wanted to mention that i think it should be possible after all to clear out finished tasks even if the number should not be bounded by directly using poll_join_next until it returns Poll::Pending