estuary / flow

🌊 Continuously synchronize the systems where your data lives, to the systems where you _want_ it to live, with Estuary Flow. 🌊
https://estuary.dev
Other
544 stars 45 forks source link

discussion: read priorities and catching up (complete vs imcomplete datasets) #72

Closed psFried closed 3 years ago

psFried commented 3 years ago

Conceptually, I think we're reconsidering what a flow collection is. We know that it represents a conceptual sequence of documents, and we know that the schema is part of a collection. A collection can either be captured or a derivations, and that the particular variant is conceptually part of the collection. If the collection is a derivation, then all the constituent transforms are also part of the collection. In other words, for derivations, what a collection is includes all of the information required to actually populate it with data and keep it up to date.

But for captured collections, how the data arrives there is not part of the collection. Or perhaps more accurately, we currently have an over-simplified model of this. Our current model says that the data source for every captured collection is just flow-ingester. While that's technically true, it's sort of akin to saying that all of my food sources are forks. It misses the point, and precludes the ability to have different semantics for different sources. So, I want to try to come up with a taxonomy of captured collection data sources. The idea being that a design for including data source information in captured collections should be informed by this.

I think the important way to classify datasets is whether they are intended to be complete or not. A complete dataset might be produced from some existing ETL workflow that rolls up all records every time its run. Or it might be taken from some "awesome public" dataset in a github repo. This is in contrast to an incomplete dataset, which might be produced from an ETL workflow that rolls up all data from the last week, or from an application that ingests data in realtime.

Incomplete datasets

These are our bread and butter, and what flow is currently the best at dealing with. Additionally, I'd speculate that this is also the more common type of thing that people would want to ingest. These datasets, whether they're a single document that's sent in an HTTP POST, or a million documents that are sent in a weekly CSV, share property that they are intended to be appended to an existing collection. For example, say you have an existing ETL job that produces the sales data for the last week. You'd want to continue to append that to the same collection because it's fundamentally additive.

Complete datasets

A good example of a complete dataset is one of the many "awesome public datasets" available in github repos. But I think probably the more important example for Flow users would be any of their existing ETL jobs that roll up data from the beginning every time they're run. These will be important because we need to provide some way of integrating them into Flow catalogs without re-writing them.

Complete datasets are not what we've optimized for, partly because they're almost never the best way to deal with data. People would almost always get more value from periodically or continuously ingesting incomplete data. But I think real world adoption will be much more difficult if we don't have a story around these.

Representation

There's nothing about the dataset or its representation that can tell you whether it's complete or incomplete. A complete dataset may be represented as a single CSV on disk, multiple JSON files in an s3 bucket, or even as a series of HTTP requests to flow-ingester. Same goes for an incomplete dataset.

In Catalog Specs

I'll take a crack at a concrete proposal here for how these concepts could map to a flow catalog. A key insight that's motivating this particular design is that even complete datasets might be later augmented with corrections and adjustments, and that we should have the flexibility to allow for that. Conversely, incomplete datasets might want to bootstrap from a CSV or other static file before they add incremental updates later. So maybe a more precise discriminant is whether the dataset is expected to be complete or incomplete before derivations start processing it.

When creating a captured collection, we would add a sources property, which is an array of data sources that will provide the documents for the collection. Examples of data sources include:

Generally, each data source could be used for either complete or incomplete datasets. For example, you might want to watch a cloud storage bucket and ingest new files as they get added, or the user may decide that a file represents a complete dataset.

Example YAML:

collections:
- name: myCapturedCollection
  schema: myschema.yaml
  key: [/id]
  sources:
    - name: foo
      complete: true
      # REST and WEBSOCKET are always implicitly defined, and represent the ability to use flow-ingester's features
    - name: REST # complete is always implicitly false
    - name: WEBSOCKET

captures:
  foo:
    uri: s3://mybucket/myobject

In this example, flowctl would trigger the ingestion (waves hands) from the s3 bucket and would ensure that it completes. But ingestion using REST and websockets would also be allowed. I think it makes sense for complete to always default to false, since that's really where we want to guide people in the majority of cases.

Modeling the concept of complete data sources enables flowctl to automatically trigger all ingestions in the proper order. I have no idea about the details of this feature, but I'll try to stick to what I think are the relevant points.

We could do a topological sort of collection dependencies and perform ingestion of all complete data sources prior to starting the derivations that depend on those collections. So given captured collections A and B, and derivation X that uses both A and B as sources, we would complete ingestion of all complete data sources of A and B prior to initializing derivation B. This would be part of the (as yet unspecified) flowctl deploy subcommand. Doing this ensure that there's no way for documents to be ingested out of order with respect to the derivations you have. This feature could also make for some really nice demos, and help with local development in general, since you'll no longer need bash scripts to re-ingest your data that's used for manually testing.

Re-deploying Catalogs

Of course this could complicate the deployment of new catalogs when an existing catalog is already running. In my current thinking, this is the trickiest part. Automatically ingesting complete datasets is a completely nonviable feature unless we can have well-defined and easy-to-understand semantics for handling changes over catalog deploys. There's also likely multiple alternatives to what I'll propose here, so I think it's pretty solvable.

What I'll propose initially is that we disallow modification of any complete data sources for a collection. The rationale is that if you want a different complete dataset, then what you really want is a new collection.

Fin

Ok I'm running out of steam, and this has gotten a bit long and (hopefully not too) rambling. But I think this is a good place to get feedback and work collaboratively toward an actionable plan. If you stayed with me this far, then thanks for your patience.

psFried commented 3 years ago

Quick thought on:

  sources:
    - name: foo
      complete: true
      # REST and WEBSOCKET are always implicitly defined, and represent the ability to use flow-ingester's features
    - name: REST # complete is always implicitly false
    - name: WEBSOCKET

This representation differs a bit from what's described in #68. The rationale for that is really just to provide a more obvious ordering with respect to the sources, which is really only relevant if we allow multiple complete sources for a single collection. It may be preferable to only allow a single complete source for each collection, in which case, the representation described in #68 would probably be preferable due its ability to bind the endpoint to the collection in a different yaml spec than the collection was declared in.

jgraettinger commented 3 years ago

Thanks for writing this up. It's a good conversation to have.

Let me start with my own worldview, and how it aligns vs differs:

Agreed that collections are discriminated on captured vs derived, and for derived collections, the transformations (and their evolving history) are part of the definition and identity of that collection. Also agreed that the means by which data is captured into a collection is part of it's identity as well. This cuts both ways: a primary motivation for moving materializations back into the catalog is because they're also be part of a collection's identity -- a collection doesn't have meaning in a vacuum, but through it's interconnections to other collections and systems.

For me the operative concept is binding. Collections are the primary concept in Flow; we may de-emphasize their system-of-record status within demos & conversations, in order to meet users were they are, but from a Flow specification perspective they're primal. Collections are bound to other systems from which data is consumed, or to which it's produced. These bindings are a top-level concern. This is required for development ergonomics -- for one, it lets me alter how a collection is populated in testing vs staging vs prod environments.

On complete-ness: I fundamentally don't believe there is such a thing as a complete dataset. It's a mirage, a pleasant fiction. Let's ground out in the example of a collection that's built from an S3 bucket and path. I might be dumping results from an ETL pipeline into that bucket. Perhaps it's an incremental update, but perhaps it's a "one-time" processing of the dataset. Or perhaps I'm dumping in what I consider to be a "static" dataset into a bucket:

In both of these cases, complete-ness still doesn't really exist. There's always the potential for updates, recants, new versions of a "static" dataset. A file can be added to a bucket at any time -- we must define what happens then.

My belief is that those files should be picked up and processed as they're discovered. If they "over-write" documents already written (e.x., they repeat a collection key), that's handled through Flow's typical last-write-wins behavior, flowing that change through derivations and materializations.

(I can almost hear you asking, "what about deletion?". I'll assert that deletion is, and always secretly has been, a query-time concern -- e.x. MVCC databases still certainly "have" deleted records, they just know not to return them. Same concept applies here, either by document tombstone enhanced with a prune policy annotation, or attaching a dataset "version" ID to documents).

Re: topological sorting of collections, I think the question you're trying to address is "how do I ensure all documents of collection A are processed before any of collection B, which joins against them?"

Here I'll point out this is a technical impossibility, because you can always race adding files A's bucket, vs making a decision that B is ready to process.

It also isn't a question that can be divorced from the Q of when documents are written to A and B. Collections are long-lived and continuous -- and I suspect part of the reason you're feeling like we need to do more here, is that that hasn't been our actual experience w/ collections so far, as collections have been short-lived testing invocations -- they're long-lived and continuous, and ingestion time is the primary means of ordering between two documents. Flow already enforces an overall, temporally consistent ingestion order as it's baseline.

We can tweak it; we do with readDelay today, and have discussed a priority concept in the past -- which may be a neat way at getting at some of the desire here -- but the central point is that "ordering" is evaluated in the context of either a) picking among two ready-to-read documents, or b) deciding to "gate" for more data, despite having other ready-to-process documents.

jgraettinger commented 3 years ago

A couple of other comments:

psFried commented 3 years ago

I fundamentally don't believe there is such a thing as a complete dataset. It's a mirage, a pleasant fiction.

I agree completely with this statement. My assertion here is that it's a fiction that should have first-class support within flow. It might be worth clarifying the working definition of "complete", as I re-read my comment above and realized it may not be clear. A complete dataset is one that accounts for all data from the beginning of time up until the present moment. Whereas an incomplete dataset only accounts for a subset of a timeline.

beginning<-------------------------------------->now
         ^----- A -----^---- B -----^--- C -----^
         ^------------------ X -----------------^

A, B, and C are incomplete datasets because they each only cover data from a subset of the timeline. X is a "complete" dataset because it covers the entire timeline. The fundamental problem is that you need to treat these datasets differently. Of course they're both incomplete as soon as you account for the future in that timeline. But if now represents the time at which a user runs flowctl deploy, then I think we need some way of dealing with these issues:

Re: topological sorting of collections, I think the question you're trying to address is "how do I ensure all documents of collection A are processed before any of collection B, which joins against them?"

Here I'll point out this is a technical impossibility, because you can always race adding files A's bucket, vs making a decision that B is ready to process.

I think this is cleared up when we account for a different notion of "completeness", but I'll try to express my thoughts here anyway just in case it helps to clarify things. Of course there's no such thing as "all documents of collection A". What is well defined is "all documents from this set of CSV files at this point in time". I think that's the problem that users actually have, and what we should try to help with in Flow. If we continue to watch the bucket and add new documents, I see that as a win. But I would still argue that we should handle the case where we need to ingest CSV A before CSV B in order for derivation Z to work correctly. I agree that the use case for this is probably much more common in testing and demos than it probably will be in production. But I think there's valid production use cases, and testing and demos are pretty important, too.

psFried commented 3 years ago

We just discussed this, and I want to record our conclusions here for posterity. The problems that users are likely to face when dealing with these datasets are:

  1. Ingestion order might matter if you don't write the join logic correctly
  2. The absence of a row in a "complete" dataset could be interpreted as an implicit deletion (from the user's perspective), and we don't yet have good support for dealing with the range of deletion scenarios.

We don't think we actually need a new concept of "completeness" in order to handle these.

The first issue is better handled by implementing join logic with the desired semantics. This is something that users can do today with the existing functionality. The immediate action item here is to add documentation that guides users toward a correct solution.

The second issue is one that we think we can better solve by using the "pruning" reduction annotations that are described as part of #71 . Once those annotations are available, users would be able implement the deletion of records based on a dataset_version_id or similar field.

jgraettinger commented 3 years ago

thinking a bit more after our conversation, I will say that a transform priority is increasingly motivated for me. I see it as solving a couple of problems:

jgraettinger commented 3 years ago

Specifically, the proposed behavior of priority is that all available documents of a higher-priority transform source must be fully processed before any documents of a lower-priority transform. This would be enforced in the shuffled read loop, essentially by transitioning the current ordering over (timestamp + read-delay) into (priority, timestamp + read-delay).

psFried commented 3 years ago

If we implemented this, it would essentially give users two different options for dealing with the ordering. Either they can make their joins fully reactive, or they could use simpler join logic and set a priority for each source. I'm uncertain of how to weigh those two alternatives. Are there any indications or contra-indications for one vs the other?

I guess I can try to answer my own question... Given a basic reactive inner join of collection A to B where we're storing the entire documents from both collections, I think we'd expect that the primary cost of the join would be in terms of the register storage, which would be roughly equivalent to the combined storage size of A and B. If we used a read priority instead, then we could implement the join to only store one side in the register. Does that sound about right?

jgraettinger commented 3 years ago

Yep that's it exactly. Fully-reactive joins require that the tracked state on both sides is bounded, such that it can be stored in a register / DB record. If both sides of the relation are fundamentally reducible, then you're golden, and fully-reactive joins are your friend.

This breaks down if the core problem is one of event enrichment, where there can be lots of B's that must join in their entirety against far fewer A's. It may be prohibitive to store all B's for a fully reactive join. If you want to relate historical B's against the last-known A, priority would be a very efficient way to do it.

Probably less frequent, but also important, is when I'm dealing with a conceptual event stream rather than a table (eg you're materializing into pub/sub or an API), and I need to filter out previous B's based on later A's.

jgraettinger commented 3 years ago

Read priorities have been implemented. I think the actionable bits of this have been resolved so I'm closing, but discussion is still here for reference.