LemmyNet / activitypub-federation-rust

High-level Rust library for the Activitypub protocol
GNU Affero General Public License v3.0
420 stars 45 forks source link

Persistent storage for activity queue #31

Open Nutomic opened 1 year ago

Nutomic commented 1 year ago

The queue for sending outgoing activities has an in-memory storage for activities that failed to be delivered and need to be retried later, when the target server is hopefully reachable again. As its only in memory, this storage is gone after a restart or crash. It would be good to provide a config option for storing it on disk, eg in a sled database.

GeorgeLS commented 1 year ago

Hello there @Nutomic. I would like to work on this issue. Should we go with using sled or look for alternative solutions? We could implement something on our own as well but sled looks pretty optimized.

Some questions first:

Also, I have another comment which we could track on another issue: I see that sled has support for batch operations. This would be good to use and exhibit the same behaviour for the send action as well in the activity level. Let me explain what I mean using the example below:

We have two servers: A and B. We have a user x in server A who is being followed from n users from server B. When user x posts something, then server A will have to make n requests to server B in order to share the information. Instead of that, we could do a batch operation an instead send one request with the information that the request data should be shared among the n users of server B. This is more performant and creates less traffic.

Disclaimer: I don't know if this implemented right now in Lemmy. From my understanding, it is not. Also, I don't know if this kind of behaviour is compatible with the ActivityPub protocol.

sunaurus commented 1 year ago

My 2 cents regarding the downsides of Sled:

As lemmy_server already has a dependency on Postgres anyway, why not just leverage that? We are already writing all activites into the database anyway, so we already know that the database can handle it.

If more speed is required and/or if reliability of data is not a huge concern, then I would still rather use Redis than Sled, just because Redis is easy to host on its own server, and has great tooling.

phiresky commented 1 year ago

We were discussing this on matrix, and @cetra3 , @sunaurus and me were all of the opinion a good first attempt at this could use PostgreSQL. It could look something like the following (pseudocode):

--- uses the existing activity table for most of the data
CREATE TYPE federation_type as enum (comment, post, comment_vote, post_vote);
CREATE TYPE federation_state as enum (todo, in_progress, gave_up);
CREATE TABLE federation_queue (
    id bigserial primary key,
    activity_id not null references activity.id,
    created timestamptz not null,
    inbox_url text not null,
    federation_type federation_type not null,
    state federation_state not null,
    retries bigint not null default 0,
);

the main process would insert stuff into federation_queue. A (potentially separate) process would take out values using the following:

WITH (SELECT id from federation_queue ORDER BY XXXX FOR UPDATE SKIP LOCKED LIMIT 1000) as batch
DO UPDATE federation_queue set state = 'in_progress' from batch where batch.id = federation_queue.id
RETURNING activity_id, id

That way, we get the following:


Implementation:

The postgresql part would probably be in lemmy. in activitypub-queue would be a trait something like:

pub trait PersistentFederationQueue  {
     /// add an activity to the queue. todo: figure out interaction with the activity table since it's already stored in there on the lemmy side
    pub fn queue(activity: ActivityJson, inboxes: Vec<Url>);
     /// retrieves a batch from the database and sets its state to "in_progress". returns (id, activity as json, inbox_urls)
     pub fn get_batch(limit: i64) -> Vec<(i64, ActivityJson, Vec<Url>);
     /// deletes / marks as finished
     pub fn mark_finished(ids: Vec<i64>);
     /// requeues some requests that have failed (state in_progress back to todo). the db should track the the retry count and figure out the delays i guess
    pub fn requeue(ids: Vec<i64>);
}
Nutomic commented 1 year ago

Agree with phiresky, although the queue function needs look more like this:

pub fn queue(task: SendActivityTask, num_retries: u8);

The num_retries param can be used so that Lemmy can avoid storing the first and second retry (after 60s, 60m), because these would cause excessive db writes.

In any case we currently need to make some major changes to the activity queue code in order to fix performance problems. This persistent storage would cause git conflicts and could worsen performance even more, so now is not a good time to implement it.

phiresky commented 1 year ago

The num_retries param can be used so that Lemmy can avoid storing the first and second retry (after 60s, 60m), because these would cause excessive db writes.

I thought so as well, but one issue is that that prevents the whole dequeuing from running on separate processes/servers..

In any case we currently need to make some major changes to the activity queue code in order to fix performance problems. This persistent storage would cause git conflicts and could worsen performance even more, so now is not a good time to implement it.

I guess, but also this would play a huge part in fixing those problems by allowing implementation of proper prioritization based on hosts and on activity type, as well as allowing the queue to be out of lemmy process memory and main lemmy cpu time. It's not just about the robustness. But yeah it would be much larger changes than simpler "fixes".

phiresky commented 1 year ago

I just realized something: There doesn't actually need to be a table that stores the send state for every inbox-activity combination like the CREATE TABLE federation_queue I described above. The existing activity table is enough as a persistent queue.

It's enough if there's a very tiny table like this:

CREATE TABLE outgoing_state (
      inbox_url text primary key,
      last_successful_activity_id not null references activity.id,
);

Then, for every receiver inbox, the dequeuer can just take the a batch of the next 1000 activities from the activities table, filter them by those that need to be received by this inbox, send them out, and update the last_successful_activity_id. If one of those activities fails to send then the others won't work either (just need to make sure it doesn't get stuck because of bugs).

That should reduces the storage and perf needed out of the queueing system (e.g. postgresql) by instance_count so by like 300x. @cetra3

It does increase the processing load if you have many instances that only subscribe to a tiny subset of content, but probably worth it.

colatkinson commented 1 year ago

Hey just wanted to check whether the implementation described above (this crate provides a PersistentFederationQueue trait, applications implement their own storage solution) is generally accepted as the way forward. I noticed the comment on https://github.com/LemmyNet/activitypub-federation-rust/pull/64, and so was unsure of the status.

For my personal use-case, BYO queue would be very helpful -- though if that ends up not being the actual implementation for whatever reason, I'll try and find a workaround of some description.

Nutomic commented 1 year ago

@colatkinson Can you explain why you want to bring your own queue? For me the use case isnt clear.

colatkinson commented 1 year ago

So in my case I'm already pulling in a different persistent distributed queuing system. While I haven't yet written a prototype of this part (so everything below should be considered speculative at best), my general inclination was to use it for sending activities as well. In addition, I currently support multiple RDBMSes -- so a dependency on any particular DB (be it Postgres or another) would be undesirable.

Beyond that, I'd expect users of the crate to have fairly diverse needs -- a single-node bot may just need some embedded DB, whereas an application concerned with high scalability may want to use e.g. Kafka. Similarly, I can imagine some applications requiring custom behaviors around prioritization or retry intervals.

That said, effectively my current plan is to apply a (vaguely hacky) patch downstream to expose a retry-less send_activity() to allow lifting this logic out of the crate -- while hopefully reducing the scope of potential merge conflicts you mentioned upthread.

Hopefully this explains it somewhat -- let me know if there's anything I can clarify further. Also happy to help with implementation efforts, though I obviously don't want to interfere with any performance work happening in this area.

cetra3 commented 1 year ago

Would #64 be in the right direction for what you're after?

Nutomic commented 1 year ago

@colatkinson This crate is not going to have any direct database dependency, it will only expose a trait like PersistentFederationQueue mentioned above, so that applications can handle storage however they want.

For me its still not clear what the advantages of bring-your-own-queue are. If you want to get rid of retries, it would be much easier to add a setting for that.

colatkinson commented 1 year ago

Ah there may have been a misunderstanding on my part -- I was assuming that bring-your-own-queue was referring to being able to implement a PersistentFederationQueue, and that the persistent queue would essentially replace the existing ActivityQueue.

My interest is primarily in having application-defined persistent queuing. If the plan is to keep ActivityQueue to shuffle messages from the PersistentFederationQueue to worker tasks or something like that, then yeah I agree BYO in-memory queue seems unnecessary for my purposes.