estuary / flow

🌊 Continuously synchronize the systems where your data lives, to the systems where you _want_ it to live, with Estuary Flow. 🌊
https://estuary.dev
Other
618 stars 51 forks source link

Continuous schema inference #816

Closed jshearer closed 9 months ago

jshearer commented 1 year ago

The existing schema inference service that reads 10s of data was a valuable change to make since it unblocks us from removing schema inference from connectors, as well as allowing us to start building out the UI/UX around schema inference. That being said, it really isn't good enough and we need continuous inference ASAP. Here's my strawman proposal for how to go about implementing this. This is based on what I remember from our conversations at the last offsite, as well as a conversation I had with @psFried, and comments on my Slack comment from last month


Overview

Rather than attempting to infer the schema of a collection, we'll instead infer the schemas of each task shard. Collection schema inference then becomes a task of finding all of the schemas from shards of tasks that write into that collection, and merging them. Not only does this allow for much higher data-parallelism, it also exposes the ability to "freely" get the inferred schema of entities other than collections, such as derivations, which could actually be a really valuable part of a future derivation-authoring workflow.

Shards

Since we're inferring schemas at the shard level, each shard will keep track of the latest schema as far as its slice of the world is concerned. The shard will compare each document it sees against this schema, and if the document doesn't match, it will update its internal schema to match the document, as well as emitting an updated schema to the owning tenant's ops/[tenant_name]/schemas collection, making sure to include metadata such as shard/task name. This will allow us to slice up these schemas later when querying for e.g the latest inferred schema for a particular collection.

In addition, this internal schema will be stateless, and start out empty each time the shard starts up. This will result in a burst of schema updates any time a shard restarts, as it "rebuilds" its internal schema. We should run some experiments to see how bad this is, but I would not expect more than a few dozen updates before the schema settles/converges.

Controlling pathological scenarios

There are pathological scenarios that would result in an continuous stream of schema changes if we use the current inference logic, such as a capture that emits documents with keys that change every time. One possible solution to this problem would be to roll up schema updates and only emit them on a particular interval. The problem with doing this is that if a shard fails in such a way that it can't write out its schema, then we'll drop any possible schema updates on the floor and not know that we have to re-read those documents and infer their schema. This could result in a situation where some documents will never match the inferred schema.

I would argue that this actually points to an improvement we should make in the schema inference logic: even if we solved the problem of dropping schemas on the floor and implemented the roll-up logic, we would still end up in situations where we have ever-growing schemas. Instead, we should introduce the ability to switch object inference to use additionalProperties past a certain number of fields, as suggested here, or possibly even infer patternProperties if we feel extra adventurous.

Looking at prior art, this does appear to be a problem that other json-schema inference libraries have solved, see json-typedef-infer's --values-hint option, for example.

Discussion point 🚨:

After typing out the options, I'm not sure that inferring additionalProperties/patternProperties is actually a solvable problem in the general case, and might require some form of user-specified hints, which adds a whole other set of complications... So I'm not entirely sure what the right path forward here is: if we don't control for dynamic keys we get an ever-growing schema even if we roll up in time, and if we do support inferring dynamic keys we'll probably get it wrong for some classes of input.

Getting the schema

Ultimately, the plan is to materialize inferred schemas on a collection-level to supabase, which will allow the schema inference service to become a simple database query. In order to implement this, we'll need to add a new reduction annotation type that is able to reduce json-schema-formatted documents together, likely using ObjShape::intersect(lhs: Self, rhs: Self) -> Self function in doc::inference. Then we can just write a materialization that shuffles on collection name and reduces the schemas together into one, which we can write out to the database.

Before doing that work, @psFried suggested why don't we just update the schema inference service to do this reduction, and I think this makes a lot of sense. We can read the ops/[tenant_name]/schemas collection, filter by the collection we care about (or possibly just read the appropriate partition, if we partition by collection name, which we should probably do), and intersect all the shapes together.

Practical implementation

jgraettinger commented 1 year ago

This plan sounds right to me. I do think we want an interval dampener in place from the get-go: for example, a shard would produce one of these documents no more than once every N minutes (observing documents for N minutes after startup before a first document, and then as-needed but no less than N minutes apart).

With that in place, the data volume should be pretty small and I suspect we can go a long way with rolling these up on demand, at query time. By the time we want to make this truly continuous, V2 derivations would ideally be in place and give us better implementation options.

Create ops/[tenant_name]/schemas collection during tenant creation. Partition by collection_name?

Partitioning on collection name is probably required for efficient query-time rollup (lets us quickly know the exact and minimal journals to read). I don't love that this effectively doubles the number of journals for a collection though (since each collection is typically one journal). At the moment I don't see a way out of this.

We may want to name this ops/[tenant_name]/profiles or similar, to reflect a possible future feature of capturing data profiling on the user's behalf -- a close cousin of schema inference.

Instead, we should introduce the ability to switch object inference to use additionalProperties past a certain number of fields, as suggested here, or possibly even infer patternProperties if we feel extra adventurous.

Yep, we should introduce a (fairly tight) threshold after which we'll incorporate additional fields as additionalProperties.