LemmyNet / lemmy

🐀 A link aggregator and forum for the fediverse
https://join-lemmy.org
GNU Affero General Public License v3.0
13.29k stars 884 forks source link

[Bug]: Federation throughput in a synchronous manner is limited to network distance #4529

Closed ticoombs closed 2 months ago

ticoombs commented 8 months ago

Requirements

Summary

Problem: Activities are sequential but requires external data to be validated/queried that doesn't come with the request. Server B -> A, says here is an activity. In that request can be a like/comment/new post. An example of a new post would mean that Server A, to show the post metadata (such as subtitle, or image) queries the new post.

Every one of these outbound requests that the receiving server does are:

Actual Problem

So every activity that results in a remote fetch delays activities. If the total activities that results in more than 1 per 0.6s, servers physically cannot and will never be able to catch up. As such our decentralised solution to a problem requires a low-latency solution. Without intervention this will evidently ensure that every server will need to exist in only one region. EU or NA or APAC (etc.) (or nothing will exist in APAC, and it will make me sad) To combat this solution we need to streamline activities and how lemmy handles them.

Steps to Reproduce

  1. Have a lemmy server in NL send activities faster that 1 request every 0.6 seconds to a lemmy server in australia.
  2. If you send New Post activities, they can affect the activity processing the most / are the longest to help validate the PoC.

Technical Details

Trace 1:

Lemmy has to verify a user (is valid?). So it connects to a their server for information. AU -> X (0.6) + time for server to respond = 2.28s but that is all that happened.

- 2.28s receive:verify:verify_person_in_community: activitypub_federation::fetch: Fetching remote object http://server-c/u/user
- request completes and closed connection

Trace 2:

Similar to the previous trace, but after it verfied the user, it then had to do another from_json request to the instance itself. (No caching here?) As you can see 0.74 ends up being the server on the other end responding in a super fast fashion (0.14s) but the handshake + travel time eats up the rest.

- 2.58s receive:verify:verify_person_in_community: activitypub_federation::fetch: Fetching remote object http://server-b/u/user
- 0.74s receive:verify:verify_person_in_community:from_json: activitypub_federation::fetch: Fetching remote object http://server-b/
- request continues

Trace 3:

Fetching external content. I've seen external servers take upwards of 10 seconds to report data, especially because whenever a fediverse link is shared, every server refreshes it's own data. As such you basically create a mini-dos when you post something.

- inside a request already
- 4.27s receive:receive:from_json:fetch_site_data:fetch_site_metadata: lemmy_api_common::request: Fetching site metadata for url: https://example-tech-news-site/bitcoin-is-crashing-sell-sell-sell-yes-im-making-a-joke-here-but-its-still-a-serious-issue-lemmy-that-is-not-bitcoin

Trace 4:

Sometimes a lemmy server takes a while to respond for comments.

- 1.70s receive:community: activitypub_federation::fetch: Fetching remote object http://server-g/comment/09988776

Version

0.19.3

Lemmy Instance URL

No response

sunaurus commented 8 months ago

Just collecting some further points from discussions on Matrix:

Nutomic commented 8 months ago

Here are some potential solutions.

db0 commented 8 months ago

Solution through brainstorming on matrix to parallelize sending requests

activity parent ID
post A post A 1
comment A post A 2
vote A post A 3
vote B post B 4
ban user parentless 5
edit comment A (1 attempt) post A 6
edit comment A (2 attempt) post A 7
edit comment B post B 8

We end up with the following sending queues

Queue A ids: 1,2,3,6,7 Queue B ids: 4,8 Queue C ids: 5

Now each queue parallel to and independent from other other queues, follows the existing logic to send its post to the target instance. So queue A will send 1 then 2 then 3 etc. If any fails, the queue aborts anything subsequent. So if queue A failed to send ID 3, it will abort, so that 3,6 and 7 will remain in the overarching queue to send.

However the failure of 3, will not stop 4,8 and 5 from going through.

Now on the next iteration of sending, 1,2,4,8,5 are gone, so the next queues will pick up 3,6,7 to send, along with 7 other IDs and split them into individual queues.

This would allow an instance to send 1-n requests in parallel without ever running the risk of sending them in the wrong chronological order.

Nutomic commented 8 months ago

@db0 In particular we can assign activities to a specific queue by post_id: queue_id = post_id modulo 10 (or alternatively using community_id).

db0 commented 8 months ago

You mean so that you have different posts per queue? Sure that could work as well. I like the idea of using the exact post ID for easier troubleshooting potential myself.

phiresky commented 8 months ago

Apparently our HTTP client doesnt keep connections alive

Do you have a source on that? As far as I'm aware it does keep them alive and thus for replication has one persistently open connection.

Other than that, I'll restate what I wrote on Matrix:

I think we can solve this in a fairly simple way without any robustness issues:

Instead of sending one activity, waiting for the 200 response, then sending the next activity, we can instead send multiple activities (sequentially down the same HTTP connection) and only start waiting for a response when 8 are currently in transit.

The only thing unclear to me is how this can be implemented. On tokio's side using a channel with .buffered(8) on the receiving end should work, but I'm not sure how to make reqwest send multiple requests while still ensuring they are in the same sequential connection.

This way, the internet latency does not matter as long as the parallel number chosen (8) is > than the internet latency divided by the internal processing time. E.g. if ping is 320ms and internal processing is 10ms, then 32 requests would need to be in the queue at once to not slow things down.

Note that this does not change anything about the internal latency of an instance, that would still be sequential per receiving instance. But it removes all the latency outside of the Rust process.

Nutomic commented 8 months ago

Maybe we can use something like this to get notified when the entire activity is sent, and then send out the next one. However that would require changes inside the federation library and exposing it in the public api. It would be easier and probably sufficient in practice if we simply wait eg 100ms between requests.

wereii commented 8 months ago

@phiresky The only thing unclear to me is how this can be implemented. On tokio's side using a channel with .buffered(8) on the receiving end should work, but I'm not sure how to make reqwest send multiple requests while still ensuring they are in the same sequential connection.

Is this HTTP Response Streaming (Transfer-Encoding: chunked) or batching multiple activities into one request [{...}, {...}, ...] ?

For chunked these seem to be relevant in Reqwest: https://docs.rs/reqwest/latest/reqwest/struct.Body.html#method.wrap_stream https://docs.rs/reqwest/latest/reqwest/struct.Response.html#method.bytes_stream

Also a good note is that HTTP/2 basically always does chunked encoding, so the above applies only to 1.1.

db0 commented 8 months ago

, but I'm not sure how to make reqwest send multiple requests while still ensuring they are in the same sequential connection.

That is exactly the problem with parallel requests in isolation and it's non-trivial to solve, which is why I suggested a more robust split-queue

phiresky commented 8 months ago

Why is it non-trivial to solve? if the library provides an option to do it it's trivial to solve. the only problem is that by default it apparently uses HTTP2-pipelining which is not in-order

db0 commented 8 months ago

if the library provides an option to do it it's trivial to solve

Does the library provide an option then?

phiresky commented 8 months ago

Is this HTTP Response Streaming

It's HTTP pipelining image

The docs you sent seem to be about responses, but here we really only care about request and request body

db0 commented 8 months ago

This helps you send multiple requests, not ensure they're processed in order which is the non-trivial part

phiresky commented 8 months ago

they are guaranteed to be sent in order and received in order so processing them in order should not be hard

db0 commented 8 months ago

so processing them in order should not be hard

famous last words :D

I don't think the http pipelining ensure that the response you get will be 200 before sending the next request. only that there will be a response. How does it guarantee for example that a request with ID 1 won't take too long to process before ID 2 hits the target?

phiresky commented 8 months ago

on the receiving side the requests can still be processed fully sequentially, no need for parallelism. note this only solves the problem in this ticket and not the one you had with internal latency

db0 commented 8 months ago

Very important from the article in wikipedia

image

From the linked source

image

If I'm not mistaken, the POSTS to inbox, are not idempotent. A connection loss while sending will cause issues.

phiresky commented 8 months ago

it does look like reqwest doesn't really have options for this, probably since http1.1 pipelining is mostly seen as superseeded by http2 multiplexing, which is better in most ways except that it doesn't guarantee any sequentiality. so probably we should use just normal http2 multiplexing (which should just work with .buffered())

I think we can semi-ignore the sequentiality problem (e.g. just add a 10ms delay between sends) though because:

If an instance is catching up, and processing e.g. 100ms per activity, and we have a parallelity of 8, then while the first 8 events will be sent simultaneously and potentially processed out of order, after that each next request will wait for the OK response 8 requests earlier, which means it will have a average delay of 10ms after the previous activity.

db0 commented 8 months ago

If you don't ensure for sequential you can end up with situations of the previous edits or votes overriding newer edits/votes if they are in the same pipeline. Likewise, a connection loss in the middle of the request will cause you issues which you can't resolve by resending, since you're not idempotent.

phiresky commented 8 months ago

Another alternative that's still simpler and less overhead than adding per-thread or per-community queues would be to add a field to each json activity or http request header with the sequential id and on the receiving side add (if the number is given) a priority queue to which events are added and processed immediately if the sequential id was incremented by 1 and with a max delay of 10s or so if it was incremented by > 1

db0 commented 8 months ago

Anything which extends the apub protocol, will not be handled by other software properly.

phiresky commented 8 months ago

it's a compatible change so it doesn't affect anything else

db0 commented 8 months ago

What do you mean by "compatible change"?

phiresky commented 8 months ago

it just adds a header or json field that will be ignored if not understood.

pseudo code:

post(/inbox).handle(activity):
     queues[activity.instance].add(priority=activity.sequence_id, activity)

def queue_process():
     queue = queues["lemmy.world"]
     last_id = 0
     while true:
         if queue.peek != last_id + 1: sleep 10s
         process(queue.take())
db0 commented 8 months ago

That's my point. If you rely on something that can be ignored, then non-lemmy software (kbin, sublinks, mastodon, piefed etc) can experience significant issues, such as bad edits, missing content when issues occur

phiresky commented 8 months ago

but i have to say since completely ignoring activity sequence in a much much worse way than the thing above is how all of lemmy worked before 0.19 i don't think this minor sequencing issue is really going to cause trouble. on high-volume instances with a parallelity of like 10 it's very unlikely there's a conflicting event is <10 events next to another conflicting one because you don't edit a post 10 times per second, and on less active instance it's also not going to be an issue because the parallelity doesn't actually have to come into effect

db0 commented 8 months ago

You can't know that though. We can't foresee all potential issues. but we know that this approach has potential pitfalls from simple connection problems. I don't see the point of refactoring to a method with includes known risks and does exactly what the designers of http pipelining tell you not to do.

PS: I am known to hit the wrong vote and correct it within the same second. Somtimes even setting whole posts as stickies through fat fingers

phiresky commented 8 months ago

i see what you're saying, though what i wrote now is now unrelated to http pipelining. imo we can do the simple solution and if it actually causes problems we can still add the sequence enforcing later. the first is a prerequisite for the second anyways so implementing it is not a waste even if it'll cause (unlikely) issues

phiresky commented 8 months ago

splitting by community or post sounds neat but it's pretty complicated and sound like the kind of thing that's very hard to tune to balance the overhead, and the kind of thing where instance admins have no idea how to tune it and complain that lemmy has gotten slower because suddenly it creates 1000 queues per instance instead of 1 and you can't even tell what those config parameters should be without trying it out in production

Nutomic commented 8 months ago

I agree we should go for simpler solutions if possible. No point overengineering something which may not even work in the end. The activity sequence id makes sense in this way. It would be good to write a short FEP about it as it will also be relevant for other projects.

db0 commented 8 months ago

I am all for KISS, but I am very worried that random connection problems are going to cause hard-to-replicate issues which will leave admins head-scratching. In this case already we know that parallel requests + no sequencing + no idempotency is going to be a problem. I also think the performance concern is exagerrated. Using the formula I provided above it shouldn't ever create more than 1 parallel action request until you start falling back massively anyway. I also strongly suggest you don't go around extending apub just for lemmy.

Nutomic commented 8 months ago

Lemmy already extends Activitypub in various ways, one more extra field wont make any difference. Plus we can get the extension standardized, that way other platforms which face the same problem can also benefit from it.

rimu commented 8 months ago

Many activities already have a 'published' field, containing a datetime. If that field was on every activity, could you use that for sequencing? activity_with_published

Nutomic commented 8 months ago

@rimu Thats a very good suggestion! Using published field means we dont need to pass community_id or similar into federation, and dont need to keep track of last sent/received id. That means we can likely implement it entirely in the federation library. Its also good that other platforms already support it.

Outgoing federation would be pretty simple then, just send a number of requests in parallel. For incoming federation we have to store it in a queue ordered by published time, and then process each activity once it is eg one second old (in case an earlier activity gets delivered later).

Nutomic commented 8 months ago

I implemented a fix in https://github.com/LemmyNet/lemmy/pull/4559

phiresky commented 8 months ago

After looking at #4559 I have a few more comments. I'll put it as bullet points without any proper conclusion.

Here's the issues we have:

  1. We need backpressure. A sending instance needs to get feedback from a receiving instance to limit the sending throughput.
    • Otherwise, the receiving instance can unintentionally be DOSed.
    • Currently this is solved in Lemmy due to the full sequentiality of activity sending. If the receiver is too slow, the sender will slow down sending.
    • This is especially an issue if one of the instances was down for a while and needs to catch up.
    • In #4559, this is not currently solved.
  2. If there's high ping / network latency, lemmy can fall behind.
    • Currently, the sender waits for every activity to be done which means that the duration of a full roundtrip is an upper bound for the throughput.
    • This seems to mainly be an issue for instances hosted in Australia/NZ, physically far away from the large instances. A ping of 150ms is slower than the current throughput of lemmy.world activities.
  3. If processing an activity is slow, lemmy can fall behind.
    • Currently, since activities are processed sequentially, the internal delay limits the activity throughput.
    • Currently, this seems to mainly have been an issue with what could be considered a misconfigured Lemmy instance (db0), where the database was far away from the lemmy server instance.
    • A well sized instance seems to have a processing time of around 10ms per activity, allowing a theoretical 100 activities per sending instance per second.
    • So far my opinion has been that (if convenient) we can ignore this problem until we actually see it happen since 100 activities per second should be enough for a while.
  4. Activities have dependencies and not all activities are commutative. If activities are processed out of order, this causes issues.
    • The currently known cases include:
      • Upvote vs Downvote - if the later vote arrives first, then the changed vote will be overwritten by the initial vote of the user.
      • Post/Comment edits - if a user edits content multiple times, the edit may be overwritten by an earilier edit.
    • Most other activities don't have this problem. For example, a comment on a post should first trigger fetching of the post if the post is not yet known. Then the post creation will be a no-op.

Here's possible (partial) solutions:

  1. Send activities in parallel with max-inflight = e.g. 10
    • This solves 1, 2 and 3 but causes 4.
    • This should work well with the persistent queue - the last_successful_sent activity id would just be the lower bound of activity IDs - in case of issues, the worst case would be to dupe resend up to 10 activities.
    • This is (afaik) one part of #4559
  2. Make all activities commutative - that is, make their order not matter.
    • Activities are already idempotent due to the received_activities table - but their order matters.
    • In combination with (5) this solves all four problems.
    • For example: If a post is edited, the new edit timestamp could be compared to the previous edit, and if it is older it could be ignored.
    • This only works for activities that fully replace the dependent activity. I'm not sure if this is the case everywhere (votes?)
    • This potentialy doesn't solve the problem for non-lemmy software (unknown?)
  3. On the sending side, create N queues per receiver and split activities by dependency chains (as suggested by db0 above)
    • This solves (1,2,3,4) without changes to the receiving side.
    • It's difficult to tune N: there's a high overhead for having too many queues, both for the in-memory overhead as well as the reduced activity cache efficiency when there's many cursors at different positions in the queue. Each instance would need different N probably.
    • It's also unclear how this would interface with the current sending queue architecture in general.
    • It's difficult to decide which queue to add each activity to unless a simple method such as community_id%N is used.
  4. On the receiving side, create a receiving queue per sending instance, delay receives by 1-5s, and process them per instance by order of timestamp
    • Together with (5) this solves (1,2,4) assuming each individual HTTP latency is in the same range of 1-5s.
    • It does not solve 3 because each receiving queue is still sequential.
  5. On the sending side, still have a single persistent queue per receiver, BUT create a buffer of N (e.g. = 20) activities. Send the HTTP requests in parallel, but for each affected community id within the buffer send the activities in sequence.
    • This is similar to (7) but should be easier to implement (though not 100% how it would look)
    • This solves (1, 2, 3, 4) as long as N is large enough.
    • Tuning N is not trivial but should be easier than (7) (less overhead).
ticoombs commented 8 months ago

Thanks to everyone trying to help fix this problem. It means a lot to me and everyone else.

As a way to speed up a temporary solution. Might I suggest implementing a queue on the receiving side specifically for all metadata fetches? Currently metadata fetches are blocking in the activity responses and are my biggest contributors to responding fast enough.
Youtube regularly takes 2+ seconds, not to mention if a site gets the hug of death due to ~1000 servers all trying to get metadata (see Trace 3 in op).

This wouldn't require all of the sending sides to be fixed, but only the receiving sides. We (receiving) would return a 20x faster due to lack of blocking requests and would ingest the activities at a faster rate because of it. I'd be happy to build & run the PoC.

While this would not solve the upper limits of 1 request per 250ms due to network latency (points 2/3) of phiresky's comment, it would reduce the 2-10+ second queries to <1s.

Also we could take inspiration from mastodon regarding their queue categories & processing queues. Giving priority to certain actions when applicable. For my above example, I don't mind if a post doesn't have metadata attached to it, as long as it eventually does.

phiresky commented 7 months ago

Currently metadata fetches are blocking in the activity responses

I was not aware metadata fetches were blocking for inbox receivals. If true, that definitely sounds like something that should happen asynchronously. Although, I assume this only happens for new posts? I'd expect posts to be less than a few percent of all the requests, with votes (and comments and edits) dominating.

Nutomic commented 7 months ago
  1. Send activities in parallel with max-inflight = e.g. 10
    • This should work well with the persistent queue - the last_successful_sent activity id would just be the lower bound of activity IDs - in case of issues, the worst case would be to dupe resend up to 10 activities.

This is actually a problem in #4559, it immediately updates last_successful_id when an activity is sent, regardless if the previous activity was successful or not. So I suppose all send results need to be collected in a single place, and only all subsequent ones were successfully updated, write it to db. This could be done by moving federation state writes into receive_print_stats(), storing most recent sends in memory. However it gets tricky as not every activity gets delivered to every instance.

  1. Make all activities commutative - that is, make their order not matter.

We should definitely do this and use timestamps, because sending sequentially wont use all available server resources. So we need to be able to send in parallel. I dont think there is a general solution here but we need to handle problems individually for each activity type (eg checking the edit timestamp like you say).

  1. On the sending side, create N queues per receiver and split activities by dependency chains (as suggested by db0 above) \9. On the sending side, still have a single persistent queue per receiver, BUT create a buffer of N (e.g. = 20) activities. Send the HTTP requests in parallel, but for each affected community id within the buffer send the activities in sequence.

I dont see any difference between these. Anyway it could be implemented by storing the community_id in SentActivity table, and then each worker being assigned tasks as community_id mod N. I dont see a need for something more complicated here. As discussed above we could instead assign by post_id, so that a single large community can still be spread across multiple workers.

  1. On the receiving side, create a receiving queue per sending instance, delay receives by 1-5s, and process them per instance by order of timestamp

This is implemented in #4559.

Also I moved metadata fetches to a background task in https://github.com/LemmyNet/lemmy/pull/4564.

phiresky commented 7 months ago

Here's an implementation in pseudocode of what i wrote above in point (9). It would be great to get comments (especially from @Nutomic since it's an alternative to their PR) before I implement it properly.

To recap, the advantage of doing this on the sending side is that this way the queue is guaranteed to be persistent - if done on the receiving side we either need to add a second persistent queue to the database or have the danger of losing activities since the 200 response happens before anything reaches a storage medium. Backpressure comes from limiting the number of in-flight requests.

/// For each community, we keep a queue of sends.
/// Sends happen in parallel for different communities, but in sequence for one community.
/// This is a map that tracks which community we are currently sending an activity for
/// and the dependent activities we can only send afterwards
struct PendingSendsMap {
    // the first entry in the map value here is always the activity that is currently being sent
    map: HashMap<community_id, VecDequeue<Activity>>;
    // total count of activities in all map values so we don't use unbounded memory
    total_count: i64;
}
impl PendingSendsMap {
    /// Add an activity to the blocklist ONLY if there's already an activity send currently happening for that community
    fn push_if_has(self, activity) -> bool {
        if let Some(entry) = self.map.entry(activity.community_id) {
            self.total_count += 1;
            entry.push_back(activity);
            true
        } else { false }
    }
    fn pop_next(self, community_id) -> Option<Activity> {
        if let Some(entry) = self.map.entry(community_id) {
            let activity = entry.pop_front();
            assert activity.is_some(); // empty lists can't happen
            self.total_count -= 1;
            if entry.len() == 0 { self.map.remove(entry); }
            activity
        } else { None }
    }
    /// how many activities we currently have in RAM
    fn total_in_ram() -> i64 {
        self.total_count
    }
    /// how many activities are currently being sent out (regardless of how many blocked ones are held in RAM)
    fn total_in_flight() -> i64 {
        self.map.len()
    }
}
let blocked = PendingSendsMap::new();

fn sending_loop() {
    loop {
        while blocked.total_in_ram() >= 1000 || blocked.total_in_flight() >= 10 {
            wait_for_one_send_to_complete().await;
        }
        let Some(activity) = get_next_activity() else {
            // everything already sent, end loop.
            break;
        }
        if !blocked.push_if_has(activity) {
            spawn(send_task(activity))
        }
    }
}

fn send_task(activity) {
    retry_loop { // as before
        http_request(activity).await;
    }
    // success! 
    let act = blocked.get(activity).pop_front();
    assert act == activity;
    if let next = blocked.pop_front() {
        spawn(send_task(next));
    }
    signal_send_complete(); // can be done with a mpsc channel sending `()`
    // todo: keep track of last_successful_id
}

Note that in real code i would not use community_id anywhere and instead using an abstract "queue id" which can for now be community_id but may later be updated to be coalesce(affected_post_id, affected_community_id) when appropriate for more parallelisation.

phiresky commented 7 months ago

I dont see any difference between these [(7 and 9)]

The difference is only in what state is persisted and needs to be kept track of. I imagined (7) as having fully separate persistent queues per community or per post, which means the federation_queue_state table would grow from 1k entries to 100k+ entries, with 100+k tasks reading and writing to that table. 9 on the other hand keeps one persistent queue per instance but within each one creates tiny in-memory queues with a bounded total size (e.g. 100 in the above code). On crashes, the in-memory queue is lost and restarts sending from at most 100 events earlier.

phiresky commented 7 months ago

This is actually a problem in https://github.com/LemmyNet/lemmy/pull/4559, it immediately updates last_successful_id when an activity is sent, regardless if the previous activity was successful or not

If we don't care about send order at all (as you do in your PR), this can be solved with a tokio Stream with .buffered(N). That way N futures from the stream happen in parallel but the results retrieved from the stream are still guaranteed to be in order.

For my above implementation, I'm not sure yet how to get the lowest successful id cleanly.

Nutomic commented 7 months ago

Im not entirely sure how your BlockListMap would work. Instead of that it would also be possible to change inflight_requests variable in my pr from an int to Vec`, then when sending an activity, check if there is already a request inflight for this community and wait for it to finish.

Also its probably a bad idea to specify the number of send workers per target instance. I think it makes more sense to have a pool of send workers so you can configure the total number of concurrent, outgoing requests (for all target instances). Then more workers can be assigned to a larger target instance when necessary. This could be implemented by having InstanceWorker only loop through SentActivity to determine which activities need to be delivered, and pass them to send workers pool via UnboundedChannel. Finally each send worker passes the result to receive_print_stats() where it gets cached and then written to db. That way database reads and writes are each done sequentially in a single place, and only the network requests are actually in parallel.

phiresky commented 7 months ago

then when sending an activity, check if there is already a request inflight for this community and wait for it to finish.

That alone doesn't work, because you don't just need to wait for the current in-flight activity to finish, you also need to wait for other not-yet-inflight activities that are also waiting to finish. So you need a Vec per inflight_requests. That's what the BlockListMap is! a map from community id to list of waiting requests for that community

I think it makes more sense to have a pool of send workers

Maybe but then how exactly do you assign the numbers? How would you prevent a huge instance from starving out smaller instances?

phiresky commented 7 months ago

Also remember that it's not really "workers", rather there's a single permanent task per instance, and then each individual HTTP request is one task. The concurrency is limited per instance, but for instances that are up to date nothing is actually running apart from the permanent task

and pass them to send workers pool via UnboundedChannel

It definitely can't be unbounded because then you'll just load millions of messages into memory again if an instance is out of date.

Nutomic commented 7 months ago

Does it really make sense to have the per-community queue on top of the per-instance queue? Seems like that would be two kinds of parallelism on top of each other. So maybe it would make sense to replace the current InstanceWorker with a CommunityWorker that does essentially the same thing, per community (or per post). Though it would also require a way to cleanup inactive senders.

phiresky commented 7 months ago

That's what I meant with variant 7 vs 9: https://github.com/LemmyNet/lemmy/issues/4529#issuecomment-2018272003

What you describe is not really possible because:

In addition, we do (probably) want to limit requests per-instance and not per-community, which is not possible if each community would be handled fully separately

phiresky commented 7 months ago

An alternative to the tiny in-memory queues per community this is another option I thought of:

buffer = [] // the next 100 activities we might want to send
currently_sending_communities = Set()

every time a send succeeds (or every time interval):
    linear search the buffer for elements that are not in currently sending communities.
         remove activity from the buffer, add to the sending set, and send it

But I revised it to the Map<id, List<Activity>> because it feels better

Nutomic commented 7 months ago

Honestly I don't think sending all activities sequentially is the solution. Even a single community can become so large that it becomes too much for a small, remote instance sequentially. Especially when there is another spam attack, or when the target instance needs to catch up after some downtime. We could repeat to split by post_id, but even that may not be enough.

The idea of my pr is that things don't need to be in order during sending, but can be put in the correct order by the receiving side. This has unlimited scaling potential simply by increasing the amount of parallel workers, and is also much easier to implement.

phiresky commented 7 months ago

I kinda agree and it's what I thought too before I read your PR code and realized all those complexities and that it's not really simpler. You still need a solution for what happens during server restart or server crash so no activities are lost, and right now you have the same sequential processing per community due to the per-community queue and the issue you have otherwise with activities being processed out-of-order. To fix that you (I think) need the same data structure I made above (it could also be on the receiving side but that's not really any better).