vlcn-io / cr-sqlite

Convergent, Replicated SQLite. Multi-writer and CRDT support for SQLite
https://vlcn.io
MIT License
2.74k stars 75 forks source link

Native networking layer #38

Open tantaman opened 1 year ago

tantaman commented 1 year ago

We'll start by creating a new set of primitives for networking atop the existing crsql_changes table. This will be a higher level set of primitives that make authoring network layers simpler.

The new primitives are:

  1. InboundStream
  2. OutboundStream

An inbound stream represents a stream of changes coming from a remote database into the local database.

An outbound stream is a stream of changes coming from the local database to a remote.

These streams will be implemented as virtual tables. Rather than being eponymous virtual tables (crsql_changes is eponymous), these will be virtual tables that can be instantiated.

E.g.,

CREATE VIRTUAL TABLE out_to_peer_a USING crsql_outbound_stream(remote = peer_a_id);
CREATE VIRTUAL TABLE in_from_peer_a USING crsql_inbound_stream(remote = peer_a_id);

The Inbound/Outbound streams will assume in-order delivery (primitives for out of order deliver can follow later). The Inbound/Outbound streams will manage bookkeeping for the stream to ensure changes are sent and received in-order and that streams can be resumed as well as deliver incremental changes.

Tasks:

  1. [ ] Create a stub virtual table for crsql_outbound_stream. Just the wiring, don't worry about implementation yet
    1. Here is an example of creating a vtab in Rust: https://github.com/vlcn-io/cr-sqlite/blob/main/core/rs/core/src/create_cl_set_vtab.rs
  2. [ ] Repeat for crsql_inbound_stream
  3. [ ] Implement the ability to SELECT from the outbound stream. See select from outbound stream
  4. [ ] Implement the ability to INSERT into the inbound stream

Select From Outbound Stream

SELECT change, cursor FROM outbound_stream WHERE cursor > :last_cursor

The outbound stream is only queryable by cursor. The change column contains all values that describe the change, packed together in a binary format. We can re-use this logic for packing columns: https://github.com/vlcn-io/cr-sqlite/blob/main/core/rs/core/src/pack_columns.rs

The underlying implementation delegates to crsql_changes

Persist the Outbound Stream Position

Provide hidden columns for last_sent and last_retrieved cursors? These columns allow resumption of an outbound stream if the connection closes, process restarts, peer returns an error, etc.

Insert Into Inbound Stream

INSERT INTO inbound_stream (change, cursor) VALUES (?, ?);

todo

tantaman commented 1 year ago

94 is our first pass at a production grade typescript networking layer.

tantaman commented 1 year ago

Current thoughts are that the native networking layer will be written in Rust (given cr-sqlite is being migrated to Rust).

It'll support websocket to work with the existing server implementation.

For p2p, I'll need to investigate libp2p and other options.

tantaman commented 1 year ago

This may be resolved by merging with sqld & libsql and adding the required endpoints for merging. tbd.

tantaman commented 1 year ago

ok.. so after writing 5 different implementations 😬 I think we're ready to do a native one.

Background

The complexity for a networking layer for cr-sqlite comes from the fact that they are so many possible ways to sync. You can sync client-server or p2p. You can sync with in-order delivery or out-of-order delivery. You can combine those different methods.

Also -- since cr-sqlite is embedded -- people generally want to run their networking layer over their existing APIs or in their existing servers. E.g., adding endpoints to their rest APIs, adding message types to their existing websocket server, using their same nodejs/netty/tokio server to sync, etc.

Traditional DBs don't have this problem given they aren't embedded.

Most setups I've run into are fine with in-order delivery. fly.io is the one use case that uses out of order delivery but they've already rolled their own networking layer which will eventually be open sourced.

Plan

Given :

My plan is to provide inbound and outbound stream abstractions, in native code, to vastly simplify network layer creation.

Current Concerns

When someone writes their own networking layer they must track:

Inbound and Outbound stream will deal with these for the developer and be exposed as new sqlite methods such that they can be embedded into and invoked from any language the user desires.

OutboundStream

This is an abstraction over SELECT * FROM crsql_changes WHERE ... that, in addition to pulling changes, does all the required bookkeeping to ensure.

Potential API:

// create the stream
stream = db.create_outbound_stream(since_version: [version, seq], excludeSites?, localOnly?);

// advance the stream
stream.next()

Creating the stream, the user specifies:

  1. The version at which to begin the stream
  2. Optional set of sites to exclude from the change log. Usually this would be the receiving site id.
  3. Whether or not to only include local changes

The user then advances the stream at their leisure. This is to allow users with realtime use cases to sync often and users with different use cases to sync less frequently.

next will pull all changes that have happened since the last time next was called and return them to the caller.

So this isn't quite a networking abstraction -- the user will still need to pass those changes over their own transport.

One thing to work out is how to handle if next is a very large chunk. next should either:

Also -- the user would want to be able to ack the next so that the internal pointer in the stream is not updated until the changes are sent over the wire.

Given that, next should return some sort of object for:

  1. Getting the current grouping of changes pointed at
  2. Acking / finalizing that set so bookkeeping can happen

InboundStream

stream = db.create_inbound_stream(from: site_id);
stream.applyChanges(results_from_outbound_stream);

Given the rate of sending is controlled by the sender, there isn't much to do with InboundStream other than create it and stop it when desired.

The output of an OutboundStream is meant to be fed directly into an InboundStream. The inbound stream will ensure that received changes are contiguous and do the bookkeeping so it can be restarted at future points in time.

Payloads

Inbound & Outbound stream payloads will be serialized as opaque binary blobs. The user shouldn't need to interact with them other than to send them over a network connection.

Some transports do not support binary so in those cases we should allow for JSON encoding the payloads as a configuration option of inbound/outbound stream.

tantaman commented 1 year ago

Once the Inbound and Outbound stream abstractions are in place I'll update one or more of the PartyKit, P2PRTC, direct-connect examples as transports and then decide on what a default native transport would look like.

tantaman commented 1 year ago

These new primitives (stateful Inbound and Outbound streams) could be exposed over SQL so as to not require custom bindings.

I'm thinking as virtual tables.

CREATE VIRTUAL TABLE stream_out_to_[server_id] USING crsql_OutboundStream (
  since = ..., -- exposed in case they don't want to start from zero 🤷‍♂️
  exclude_sites = [server_id]
);

SELECT package, cursor FROM stream_out_to_[server_id] WHERE cursor > ?