MeltanoLabs / Singer-Working-Group

Working group for ongoing development and iteration of the Singer Spec, the de-facto protocol for open source data connectors. Please use "Issues" to create discussion items - or use "Discussions" for general questions.
Apache License 2.0
13 stars 4 forks source link

Stream-level composability for state artifacts #6

Open aaronsteers opened 2 years ago

aaronsteers commented 2 years ago

As discussed in: https://gitlab.com/meltano/meltano/-/issues/2903

Summary: We'd like to be able to run multiple copies of a tap in parallel and then merge the state from the two jobs. While this likely works in practice with most taps today, it requires some assumptions about how state is stored and where/whether merge conflicts are expected.

For example:

An orchestrator kicks off users and customers sync as two separate jobs, running in parallel. If the STATE output from one job contains a top-level users bookmark, and the STATE output from another job contains a top-level customers bookmark, we would like the two job outputs to be able to be merged together to create a single merged STATE output with both users and customers bookmarks combined.

aaronsteers commented 2 years ago

The inverse of this question came up in a recent discussion: "Should state artifacts be splittable?"

Meaning, from the state output of a tap execution with 3 streams: can I split that state artifact into three distinct states, one for each stream?

Another way to phrase this could be "composable and decomposable states".

cc @tayloramurphy , @DouweM , @dmosorast

tayloramurphy commented 2 years ago

@aaronsteers great call out. Perhaps this issue could be used to provide some structure for state in the spec? Currently documented in the spec is this phrase:

There structure of the value property is not defined by the spec and is determined by each tap independently.

But there is a "recommended" structure. Perhaps we could add a few optional fields to a given state message (or within the value key) such as a unique ID, extraction timestamps, and other metadata?

aaronsteers commented 2 years ago

@tayloramurphy - I like this approach personally. I think at the top level, this already exists for 95% of taps:

{
  "bookmarks": {
    "stream_1": {},
    "stream_2": {}
  },
  "nonessential_extra_1": {},
  "nonessential_extra_2": {}
}

Important to codify:

  1. bookmarks are named per stream, with the stream name matching the bookmark name
  2. stream bookmarks are atomic (meaning the stream_1 bookmark doesn't need anything from stream_2's bookmark, and vice versa)
  3. non-essentials may be dropped or clobbered with no loss of essential function

@dmosorast - What do you think of this approach, especially point 3. I know you use markers to resort/order streams. Could that (and other similar use cases) be deemed "non-essential" in the case of splitting/merging?

dmosorast commented 2 years ago

@aaronsteers Yes, I think the structure you specify there is the most common, and I think there is quite a bit of frameworky code that encodes this assumption. I agree with the "bookmarks" -> tap_stream_id -> { ...values } approach as a standard.

For the non-essentials portion, I'm wondering what values are put up there in practice. The only reason I can think of that would require values at the top level is if they do not apply to a specific stream, and the only key that comes to mind right now is currently_syncing which is indeed non-essential, and would just result in re-work if it was missing.

However, I think it is feasible that a tap could have a need for some other more crucial stateful values that would apply to all streams (e.g., a global min/max transaction ID for a database, or the last sync's start for a complex nested bookmarking strategy). Though, in this case, maybe those keys would be nested at the top level under bookmarks.

The more I think about it, the less I'm convinced that we can definitively say any keys outside of bookmarks are non-essential, but that might be a good seed for another proposal to come up with standards around the non-"bookmarks" section of state.

Clarifying Questions

To add some clarity for me, is this being proposed as a Standard to codify that per-stream bookmarks should be specified in this structure? or is it a spec change (e.g., changing how the STATE message is defined)?

The reason I ask is that a core concept of the STATE messages is that the result is a fully formed state object. Of course, the orchestration layer or some other higher-order framework can play with that as much as it wants, but as far as the spec is concerned, I'd be more hesitant to consider how to extend the state message itself.

aaronsteers commented 2 years ago

@dmosorast

To add some clarity for me, is this being proposed as a Standard to codify that per-stream bookmarks should be specified in this structure? or is it a spec change (e.g., changing how the STATE message is defined)?

I think it would be the former. No need to change how taps communicate STATE or how the STATE message works. The action here would just be to clarify guidance so that orchestrators can take this as a given and to the needed. 👍

However, I think it is feasible that a tap could have a need for some other more crucial stateful values that would apply to all streams (e.g., a global min/max transaction ID for a database, or the last sync's start for a complex nested bookmarking strategy). Though, in this case, maybe those keys would be nested at the top level under bookmarks.

I agree, at least in theory, this is a possibility. I think in those cases, we'd encourage any critical functionality to be encoded redundantly per stream. This is also important in general: since presumably toggling a new stream to be "selected=true" after previously being deselected would buy into the same downsides - for instance, if global progress in a binlog tracker, is inferred from a stream that wasn't previously scanned to that point.

dmosorast commented 2 years ago

@aaronsteers Great, as a stated Standard, I'm currently on board.

For the global state, that's a good point. In the case of global values, the ability to select and deselect streams requires the tap to be able to 1) Detect a stale bookmark entry on the global piece of state (even if it was a top-level piece) and 2) Be able to process a state to account for the earliest viable bookmark across streams.

I think this is how singer-io/tap-postgres accomplishes this for logical replication. So, as a piece of official guidance I think it would make sense to mention that global state should be stored per-stream to account for selection/deselection. It seems important to mention for the official SIP. (As an aside, I'm liking the SIP acronym)

dmosorast commented 2 years ago

Maybe as an added piece of "versioning" concern, there are some taps that don't ascribe to this standard, so versioning the standards might need to be considered as well.

As an example, singer-io/tap-stripe uses an odd way of requesting updates, and stores bookmarks in an unconventional way that violates this standard.

What does that mean as a whole to orchestrators using that tap-stripe? The beneficial assumptions won't be able to be made for it, but a refactor of how state is written is a pretty huge breaking change that would require some kind of step-wise upgrade procedure. I'm curious about your thoughts on that kind of "certification path".

aaronsteers commented 2 years ago

Maybe as an added piece of "versioning" concern, there are some taps that don't ascribe to this standard, so versioning the standards might need to be considered as well.

It's a good point, and this probably warrants its own issue - and how we approach probably depends on the specifics of the part of the spec and the type/degree of incompatibility.

but a refactor of how state is written is a pretty huge breaking change that would require some kind of step-wise upgrade procedure

State format changes are specifically tricky. In the SDK, we have a overloadable load_state() which we know will need to handle prior conventions (if we ever change the current ones) and if we ever do change where/how state is stored, we would need to do a one-time translation of old-state-convention to new. I imagine the same process would need to be implemented for tap-stripe (for instance) if the convention is changed and there are legacy state artifacts which do not match latest/newer conventions that the tap wants to support in future.

aaronsteers commented 2 years ago

Great, as a stated Standard, I'm currently on board.

Sweet!

FYI @tayloramurphy, I think this is ready to be drafted up and commented on.

kgpayne commented 2 years ago

Just a note to say Meltano already merges STATE messages for failed/incomplete previous pipeline runs. This is effectively done to update the 'last good run' with any streams that were able to successfully complete and emit STATE (even if the overall job failed). Logic is here.

aaronsteers commented 2 years ago

Adding to the above, the merge logic is here: https://gitlab.com/meltano/meltano/-/blob/master/src/meltano/core/utils/__init__.py#L141-157

Based on a simple recursive dict merge logic as shared here: https://stackoverflow.com/a/20666342/4298208

edgarrmondragon commented 2 years ago

@aaronsteers I think the merge algorithm (as pseudocode) should probably be part of the SIP. Wdyt?

aaronsteers commented 2 years ago

@edgarrmondragon - Yes, I think so! I'm not sure though if we should infinitely recurse dictionaries or just root + "bookmarks" + optionally "partitions" when it is a grandchild of "bookmarks".

Whatever is deemed safe and most interoperable, would be good to include in the SIP text.

aaronsteers commented 2 years ago

Related: as noted in the last paragraph of my comment here, we may also want the orchestrator to treat null value assignments such as 'bookmarks': ('my_stream': null} and 'partitions': {'key=1': null} as a signal that it can safely drop both the key and the value for the nullified key-value pair.