Open KyleAMathews opened 3 days ago
I think we want this to be a declarative list of subscribers, soo something like:
type ShapeStreamSubscriber = {
onMessages: (messages: Message[]) => void | Promise<void>,
onError: (err: unknown) => void
}
subscribers: ShapeStreamSubscriber[] | ShapeStreamSubscriber
Is there not a slight chicken and egg thing of needing to instantiate a Shape to have the subscribers to instantiate the ShapeStream which is needed to instantiate the Shape ...?
The alternative is an explicit .start() method on a shape stream. It solves two issues:
.start() would return a promise that resolves/fails depending on initial connection - such as auth failing. This is imposible to do with the sync constructor (the alternative is an async static .create() but that doesn't solve lazy subscription)
After calling .start() we can lock the subscriptions and throw on any further .subscribe() calls. (Although there is an argument for late subscription for power users and edge cases...)
Is there not a slight chicken and egg thing of needing to instantiate a Shape to have the subscribers to instantiate the ShapeStream which is needed to instantiate the Shape ...?
This is part of the problem in how we currently use ShapeStream
- as soon as we instantiate it it starts streaming, which means that if you "feed it" to a Shape
asynchronously it will have missed messages and have inconsistent state. Passing an ongoing stream to a shape to materialize it is odd, since the materialization requires the shape to have access to the stream from the start (and that is not guaranteed when being passed an instantiated stream).
The way to do it with the declarative approach would be that Shape
actually accepts Omit<ShapeStreamOptions, 'subscribers'>
rather than a ShapeStream
instance, and it internally creates a shape stream with itself as a subscriber.
There is also another way, using a sort of ShapeStreamBuilder
, see below:
The alternative is an explicit .start() method on a shape stream.
We had a discussion around this with @KyleAMathews @kevin-dp @balegas - currently we are mixing two patterns, a declarative one and a builder one.
If we opt for the fully declarative one as the core pattern, then we can very easily code up a ShapeStreamBuilder
that uses a builder pattern, where you would do something like:
const builder = new ShapeStreamBuilder()
..setUrl(url)
..setShapeDefinition(shapeDef)
..addSubscriber(sub)
// ... stuff happens ...
builder.addSubscriber(sub)
const shapeStream = builder.build() // or .run()
// internally the build call would create a declarative `ShapeStream`
This solves the cases where people might want to lazily add subscribers without 1) having unclear semantics as to when the stream starts or whether they can add more subscribers after etc, 2) having any externally mutable state on the stream itself and having to deal with special cases and flags and what not.
catching exceptions during the initial connection
We had discussed that this might not make that much sense, since a stream is a continuous thing and you might get auth errors or invalid relation errors at any point during the runtime of the stream - so separating the "initial connection" is extra overhead that could be handled by one generic declarative global onError
handler.
We can have a static .validateShape
API that is a single async call that ensures you can create the stream and everything is correct if that is important as a step, or we can have a .started
promise that resolves perhaps if someone wants to await it to ensure that the stream has at least done one request or something - but errors will be caught in both the global onError
handler if provided as well as in the individual subscriber error handlers.
Although there is an argument for late subscription for power users and edge cases...
I am of the opinion that if someone is this much of a power user and requires this sort of undefined behaviour they should be able to fairly easily write up their custom client implementation of the protocol - but also I think it's better to optimize the client for the intended use cases with minimal API surface and clear semantics and wait until we get requests for anything else!
Initial reaction to builder concept was a -1, but it actually translates really nicely to a builder pattern for client side processing of the streams - so I'm a +1 now:
const comments = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
const issues = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query() // Start building a query (may not be needed)
.include({
// Client side include of the two shapes
shape: comments,
key: 'id', // column on issues
on: 'issue_id' // column on comments
as: 'comments' // added the comments as a 'comments' array prop on all issues
})
.subscribe((msg) => doStuff(msg))
.run() // implies .run() on comments
const issues = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
const comments = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query() // Start building a query (may not be needed)
.join({
// Client side join of the two shapes
shape: join,
key: 'issue_id', // column on comments
on: 'id' // column on issues
as: 'issue' // added the issue as a 'issue' prop on all comments
})
.subscribe((msg) => doStuff(msg))
.run() // implies .run() on issues
@samwillis huh that is pretty compelling
@samwillis happy that it works with that pattern as well! (although I'm unclear if the comments
builder .run()
call would also "build" the issues
stream as well - i.e. "deep run")
I do want to stress the point that the declarative pattern is easier to deal with and does not preclude us from having the builder pattern/ lazy sub as well - but I think our "core" API being declarative makes it easier to test/maintain (and it's my personal preference hehe... objective opinions only)
The main issue with our current approach is that we allow people to subscribe to an ongoing stream, for which there are very limited and particular use cases and is much more likely to cause hard to catch bugs (it's always possible to just start a stream with a given offset if someone want's to "resume" mid-stream)
@msfstef agreed, in general I prefer a declarative api.
Thinking about it further, we don't need the implied "deep run", but not that the output of the join include won't start until both streams have started.
I wander if the solution of passing a ShapeStream
to a Shape
is not to - rather than:
const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
table: 'items'
})
const shape = new Shape(stream)
we do:
const shape = new Shape({
shape: {
url: `http://localhost:3000/v1/shape`,
table: 'items'
}
})
This is what we did with the PGlite sync plugin: https://pglite.dev/docs/sync#using-the-sync-plugin-alpha
@KyleAMathews note that we could add multiple subscribe calls at intermediate steps in a shape query:
const comments = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query()
// we don't need to save this bit, just subscribe
// count all comments
comments.count(). subscribe((msg) => ...)
const issues = new ShapeStreamBuilder()
.setUrl(url)
.setShapeDefinition(shapeDef)
.query() // Start building a query (may not be needed)
.include({
// Client side include of the two shapes
shape: comments,
key: 'id', // column on issues
on: 'issue_id' // column on comments
as: 'comments' // added the comments as a 'comments' array prop on all issues
})
.subscribe((msg) => doStuff(msg))
.count()
.subscribe((msg) => ...) // count all issues
.run() // implies .run() on comments
@samwillis re:Shape
instantiation - absolutely agree, see above comment:
The way to do it with the declarative approach would be that Shape actually accepts Omit<ShapeStreamOptions, 'subscribers'> rather than a ShapeStream instance, and it internally creates a shape stream with itself as a subscriber.
Ultimately a shape owns the stream, as it needs to be a subscriber to it from the start and start from offset -1 - perhaps you could even just specify new Shape(opts: ShapeStreamOptions)
and the shape just adds itself as a subscriber on top of any subscribers that you have provided - but that might just be confusing.
Here's another thought — we don't want either declarative or builder — the nature of our system is we create streams than process streams. Stream
is the only class. Everything else just processes those streams.
Shape
should just be:
import { ShapeStream } from '@electric-sql/client';
import { reduce } from '@electric-sql/stream';
const issueStream = new ShapeStream({...})
issueStream
.pipe(
reduce()
)
.subscribe(issues) => console.log(issues)); // Output: [issue1, issue2, issue3]
// A declarative Shape
function shape(shapeStreamOptions) {
const stream = new ShapeStream(shapeStreamOptions)
return stream.pipe(reduce())
}
const issues = shape({...})
issues.subscribe(issues) => console.log(issues)); // Output: [issue1, issue2, issue3]
This relates to what we were talking about on discord about a generic stream lib. We have stream readers (one of which of course is ShapeStream) which provide a stream of operations against a fixed schema. And then stream operators which can map, filter, reduce, join, various aggregators.
On autostart or not — in the stream processing world — most streams don't start moving bytes until there's a reader — so creating a stream and then not using it until it's piped in a processing pipeline or subscribed to directly makes sense to me.
There's also the idea of replayable streams — we have that already with our offset-based caching scheme — so the result of any stream operator would also be URL-addressable and http-cachable as each stream operator just emits processed operations e.g. reduce just keeps update
operations with the latest value. So if you want to do some pre-processing of tables to join and tweak the data structures a bit — you could do that and then expose as a standard shape URL.
@KyleAMathews I like the idea of the stream being "pull based" - and if you want multiple subscribers, you can first setup a subscriber to forward messages to e.g. 3 other subscribers "down the pipe" and then subscribe to the stream and you achieve the same effect.
Making our stream match a standard stream library behaviour would be really nice
Making our stream match a standard stream library behaviour would be really nice
I'm not sure we can w/o a lot of acrobatics — but it's definitely worth giving it a serious look cause yeah, it'd save a lot of work
if you want multiple subscribers, you can first setup a subscriber to forward messages to e.g. 3 other subscribers "down the pipe"
"tee" is a typical term here for splitting the stream — then if you have one fork in the stream that's reading faster, the tee will buffer for the slower reader.
If we all our stream stuff can be fronted with http and can cache logs to disk or other places, then the buffering & replay happens pretty much automatically.
I like the reduce API.
e.g.
new ShapeStream({ subscribe: () => {} })
The problem with
.subscribe()
is then people get the sense that they can subscribe whenever and they'll get the full stream where if you delay calling .subscribe then you'll miss the early messages. So to avoid this footgun and in general, keep construction of streams fixed & declarative, let's move subscriptions to the constructor.