deltachat / deltachat-core-rust

Delta Chat Rust Core library, used by Android/iOS/desktop apps, bindings and bots 📧
https://delta.chat/en/contribute
Other
658 stars 84 forks source link

Persistent events #4328

Open link2xt opened 1 year ago

link2xt commented 1 year ago

Events are stored in memory and may be dropped if the process crashes or if the channel overflows. Events::new() creates channels which can only hold up to 1000 messages and Events.emit() drops the oldest event if the channel is full.

Unreliable events are sufficient for clients that display the current state and rely on events to get notified about the need for a refresh, but are not good enough for bots that want to process all incoming messages. Because of this, the reliable approach to implement echo bot is to use INCOMING_MSG event as a notification for update, but then query for all fresh messages and mark them as seen once processed: https://github.com/deltachat/deltachat-core-rust/blob/28a13e98a66deda5da31f2b455e2fe6220dd75a3/deltachat-rpc-client/examples/echobot_no_hooks.py Even if one INCOMING_MSG event is lost, messages will be processed when the bot is restarted or receives another INCOMING_MSG event.

get_fresh_msgs()-based approach is sufficient to reliably process all incoming messages and react to commands sent in messages, but assumes that account is used only by a single instance of the bot and relies on messages not being marked as seen or being sent by other devices. To solve this problem, #4319 introduces API which explicitly stores the last processed message database ID and can be used to process both incoming and outgoing messages even if the bot is used together with other devices. Example of such bot is https://github.com/deltachat/deltaircd which bridges all messages to a local IRC server and allows using any IRC client as an alternative Delta Chat client.

get_next_msgs() approach from #4319 can be used to process all messages, but still does not allow to reliably react to chat member list changes, user and group profile changes, encryption configuration changes, message reaction changes and so on. To make it possible, the proposal is to assign sequence numbers to events and add new API wait_next_events to poll for new events and mark them as processed.

Event ID is a signed 64-bit integer to make it compatible with SQLite ROWID. Persistent event is fully identified by its account ID and event ID. If persistent events are enabled, all events are stored in the database until the call to wait_next_events with equal or higher last_event_id is issued. If persistent events are not enabled, all events will have implicit event ID equal to -1, possibly not serialized into JSON.

JSON-RPC API will consist of a single wait_next_events(account_id, last_event_id) call, blocking until there is at least one event with event_id greater than requested and returning a non-empty array of events. There may be an arbitrary upper limit on the number of returned events decided by the server. wait_next_events call can be interrupted by calling stop_io which is guaranteed to emit an INFO event. If there are multiple accounts, they should all be polled in parallel using separate calls. This API should also work if persistent events are not enabled and eventually replace non-standard JSON-RPC notifications. In this case events are polled with last_event_id = -1 all the time. deltachat-rpc-server will keep only send_task and recv_task, while events_task converting events into notifications will be removed. deltachat-rpc-client and other bindings such as Go bindings will poll for events using wait_next_events internally, there will be no change in their external API, bots will only need to decide whether to enable persistent event setting.

I have not decided yet how C API should look like, so at least initially there will be no C API for this feature.

link2xt commented 1 year ago

Some thoughts about the API design and keeping backward compatibility.

Events emitted by the context go into the event channel and, if enabled, to the debug logging XDC. I have looked into debug logging and made some fixes at #4329 as a preparation for persistent events. If persistent events are enabled, they will be written to the database from the Context.emit_event() too, so I started by cleaning it up.

Account manager has the ability to emit events with account_id=0: https://github.com/deltachat/deltachat-core-rust/blob/5403fd849cfb0f6f93e4008705ac4a993c2cd923/src/accounts.rs#L286-L289 These events cannot be persisted. Fortunately, it seems the only usage of this is to emit an info event when I/O is stopped: https://github.com/deltachat/deltachat-core-rust/blob/5403fd849cfb0f6f93e4008705ac4a993c2cd923/src/accounts.rs#L266 We only need to make dc_get_next_event() return some info event after a call to dc_accounts_stop_io() to keep backward compatibility. This code can be kept as is, emitting this info event only into the event channel. There is no attempt to emit account manager logs into debug logging XDC and there is no need to try to write them into persistent event table.

EDIT: there is some usage of account manager event channel in deltachat-ffi, it attempts to write FFI call errors into the event emitter. This is not a problem for JSON-RPC API as it receives errors as return values. We just need to drop the events.

Persistent events cannot be used to log anything before the database is open or if there is a failure to open the database. This is fine for JSON-RPC API as there we can return rich errors describing what happened on failure and otherwise there is no need to log anything or emit events.

We are going to have three event targets:

  1. Event channel with event emitter. This is used by dc_get_next_event() and to emit notifications in deltachat-rpc-server. This is an unreliable interface as it may drop events if events are not read from the channel quickly enough and it overflows. We keep it as the default for now but discourage usage of notifications in JSON-RPC bindings. We may drop events_task from deltachat-rpc-server after some releases once all bindings switch to new APIs.
  2. Debug logging XDC, enabled by dropping "debug_logging*.xdc" into Saved Messages chat.
  3. New "event queues". Unlike the event channel which is global for all accounts, there is one event queue per context. It is used by calling Context.wait_next_events(last_event_id: i64) -> Vec<Event> API. Whether the queue is in-memory or persisted into the database is determined when the Context is created and cannot be changed afterwards to ensure the event IDs are always increasing.

The following choices are mutually exclusive, the application should decide which one to use at the account manager creation:

  1. Event channel and JSON-RPC notifications.
  2. In-memory event queues.
  3. Persistent event queues.

For deltachat-rpc-server I am thinking about an environment variable set to either DC_EVENTS=memory or DC_EVENTS=persistent with the default of using the event channel. How to make the API nice on the Rust side I have not decided yet.

flub commented 1 year ago

Events are stored in memory and may be dropped if the process crashes or if the channel overflows.

link2xt commented 1 year ago

Is the process crashing really an issue? Relying on events across restarts seems sketchy

The bot may be restarted right before processing an event that is still in the queue. It should respond to the message when it is restarted. This does not have to be a crash, may be a server reboot, bot update or deltachat-rpc-server update.

I guess you may as well re-read the chats and use their unread state.

I have already wrote about this in the fourth paragraph of the first post. Relying on the unread state does not allow you to find messages which got new reactions or changed their status, e.g. got an non-delivery report, and react to changes to the user or group profile, e.g. if you want to react to the changes of the user display name. Otherwise new API from #4319 is good enough.

The goal is to have a single event stream that the bot has to go through. This is what bot developers are used from other APIs such as Telegram Bot API where the bot has to go through a single feed of "updates" which are guaranteed to be persisted for at least 24 hours. This is much easier for developers than re-reading all the chats, checking which messages you have already replied to and at the same time receiving events telling you that you need to check some chats again.

If the process can't keep up with events that are stored in memory, will piling them up into the database really be the right response? How will it keep up with them? Maybe it needs a little more buffer than the default 1000 messages to handle bursts, but is persistence really needed?

I do not consider the case where the bot is constantly overloaded. Queue overflow may still easily happen if the bot is restarted after a week of inactivity and there is a large queue of users asking to reply to their ChatGPT prompts waiting for the bot to start answering. And there is a high chance that bot will be restarted again shortly after starting if admin notices a problem in the configuration that requires a restart. Dropping unprocessed events in this case is not nice, as the bot will then ignore most of the queued up requests.

link2xt commented 1 year ago

@adbenitez

about persistent event processing, I am not sure about them unless the state is consistent, or the events contain enough info, ex. how useful it is to a mattrix bridge to know "chat changed" but not what changed, and by the time you are processing that event, core internally already processed 3 more group name changes that are in the queue so you can't really map all the group changes to the matrix bridge side

We can introduce as many events as needed and add all the data that the bot needs there. I also do not care if legacy clients cannot access some of the attached data, for example ReactionChanged contains chat_id, msg_id and contact_id, but only chat_id and msg_id are exposed via data1 and data2. To get the contact ID you need to read JSON.

It is ok for example to add a new event ContactNameChanged and put both the new and old name in there, so you know exactly what changed even if by the time you process an event the name has been changed again and the contact was deleted.

If these events are not interesting to UIs, we may add an API to select a subset of events, but I am not going to do this yet, we can always introduce it later while keeping "all events" or "all events at the time filters were introduced" as the default.

link2xt commented 1 year ago

After more coding and experimenting, I think we will only guarantee no event loss in case of graceful shutdown (#4306 #4323 #4330). We cannot promise that event is not lost on process crash, because this will require that we are very careful about writing an event and the change in the same transaction to ensure that the process cannot crash after doing a change but before emitting an event about it. And this is not efficient as we will write each event in its own transaction, always flushing to the disk. In addition to that, writing to the database from synchronous Context.emit_event() without blocking other tasks is not so easy, spawning a task or using block_in_place for each event looks wrong even if it will likely work.

So instead I will send events to a task started at Context creation that will write events to the database in batches: reading as much events as possible out of the channel, then writing them all in a single transaction. This way we will write events to the database in batches and delete them from the database in batches. If the process crashes without proper shutdown, events still stored in memory will be lost. Maybe we even only store on shutdown because most events are going to be processed almost immediately after arrival.

So there will be no separate "in-memory" and "persistent" variants, events will be persisted in case of a bot crash or disconnection from the core, but not in case of the core or system crash.

If the bot absolutely has to make sure it cannot lose a downloaded message, it will have to use #4319 API which is not going away, but this API is limited to processing messages and the message may be still lost to a spam filter, unreliable SMTP connection etc. For most use cases persisted on shutdown events should be enough.

link2xt commented 1 year ago

Current plan is to replace this channel with a different multi-queue channel: https://github.com/deltachat/deltachat-core-rust/blob/4716fcef940bc15d57a9c60a33e03edf947b2507/src/events.rs#L14-L19

This new data structure should still have an ability to extract some event, to support get_next_event(). The order of events belonging to different contexts may change, we do not promise any fairness. But in addition, it should allow requesting all events from a sub-queue identified by the account ID. When the context is dropped or right before that, it should extract all events identified by its ID and put it into the database. When the context is created and attached to the channel, it should extract the events from the database and put them into the channel.

Currently we are using a bounded channel from async-channel crate.

link2xt commented 1 year ago

For reference here is a PR which removed custom implementation of event emitter that polled several event channels: https://github.com/deltachat/deltachat-core-rust/pull/3421