airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
14.77k stars 3.8k forks source link

proposals for the connector specification #5084

Open jgraettinger opened 2 years ago

jgraettinger commented 2 years ago

Hi team Airbyte!

tl;dr: We (team Estuary) want to work with you to evolve the Airbyte spec into a community standard for building data connectors.

Things we especially like about your improvements to the Singer spec are the schema-driven configuration discovery workflow and the use of Docker for packaging and discovery of connectors (rather than PyPi).

The protocol is adaptable to continuous / low-latency streaming contexts, which is a particular focus for Flow. We've worked to integrate the "source" protocol into Flow, and have also worked up some OSS source connectors for various technical systems (S3 & GCS with pattern matching, incremental bucket scans, CSV, and compression handling ; Kinesis ; Kafka soon).

In the process we've implemented and experimented with several extensions to the spec we wanted to talk through and propose for inclusion. I've bundled these in this Github issue, but they're largely separable and can be discussed/evolved/adopted independently.

Proposals

  1. Allow a connector to produce multiple states over its lifetime.

    A connector invocation is permitted to produce many STATE messages over its lifetime. Each STATE reflects a checkpoint that reflects the RECORDS which preceded it.

  2. Add optional tail to ConfiguredCatalog

    tail is an boolean which tells the connector that it may run indefinitely, producing new RECORDs and STATEs as new data becomes available.

    If false, the connector should produce available RECORDS and then exit, as is the current behavior. tail is optional (assumed to be false if missing), and connectors are not required to do anything with it.

    A Kafka connector might use tail to determine whether it should consume indefinitely, producing new RECORDs as they arrive, or should read through offsets determined at startup and then exit.

  3. Output STATEs are reduced using RFC 7368 Merge PATCH.

    As currently specified, each STATE produced across connector invocations is a complete replacement of a prior STATE.

    Consider an AWS Kinesis source which is capturing from many underlying Kinesis shards, where the connector STATE encapsulates separate read offsets for each shard.

    Today, when any one shard has data, the connector emits its RECORDs and then must re-state the STATE of all shards. Under this proposal, it could instead write a STATE that reflects just the new shard data. The client of the connector then knows to PATCH in that delta STATE in order to update the complete STATE.

    One option is to say that all STATEs are PATCH merges, which changes the assumed behavior of a bunch of existing connectors. This may be backwards-compatible in practice, given how JSON PATCH works and the stable of existing Airbyte connectors, but we're not certain of that.

    Another option might be to add a patch bool property to the STATE message that toggles this behavior.

  4. Add optional range to ConfiguredCatalog

    range is a pair of {begin, end} inclusive uint32 integers which tell each connector invocation what it's responsible for. range facilitates connector parallelism: each invocation can consult its range to determine if it should do a unit of work, or that one if its peers will.

    The exact usage of range is up to the connector.

    • A GCS / S3 connector might hash each file name into a uint32.
    • A Kafka connector might hash each partition.
    • Our Kinesis connector determines the overlap between the Kinesis shard's range and that of the connector.

    Examples of ranges:

    • {"begin":"00000000","end":"ffffffff"} : connector has only one invocation, covering the full range
    • {"begin":"00000000","end":"7fffffff"}, {"begin":"800000","end":"ffffffff"} : connector has two invocations, each with 1/2 of the range.

    If not specified, range is assumed to be {"begin":"00000000","end":"ffffffff"}.

    A particular rationale for range (over, say, worker count and index) is that it allows for dynamic scale-out without breaking existing STATEs. A range and its prior STATE can be subdivided into new ranges, each starting from the common parent STATE.

  5. Add optional projections to ConfiguredStream

    Projections is a map of {field name: JSON-pointer}.

    Many sources of data are tabular in nature (SQL tables, CSVs), while connectors work with JSON documents. An reasonable projection used to map between these models is map table columns into properties of a root document object. But it's not the only one!

    projections instruct the connector of an alternative location where a given field should be inserted into a RECORD document.

    To give a grounded use case, Citi Bike provides (somewhat messy CSV) of their system data. Our S3 connector is able to use projections to map CSV columns into more natural document locations in the RECORDs it produces.

  6. Adopting community-oriented naming for the protocol

    It will be challenging to build multi-company momentum behind a specification named for one company.

    While pretty blah, would you be open to something like the "Open Connector Specification" (or have better ideas?).

cgardens commented 2 years ago

Hi @jgraettinger!

Thank you for posting this issue. We'd love to work together on developing this standard!

Here is how we are thinking about some of the points you raised.

  1. Allow a connector to produce multiple states over its lifetime.

Most connectors do emit multiple STATE message over their lifetime. The Airbyte job runner only records the last emitted STATE message one (will discussed more in #2), but connectors can emit STATE messages incrementally.

Let me know if there is something I'm not understanding with regard to how connectors interact / emit state. Also helpful if the feedback is just that this was not clear from our docs.

  1. Add optional tail to ConfiguredCatalog

I agree that we should add first-class support for streaming connectors that are designed to run indefinitely. I'd like to explore a little more how to make this intuitive.

The way the Airbyte implementation of the protocol works right now, there is (mostly) nothing that really prevents a connector from running indefinitely. We do eventually stop a sync job if it has both run for a long time and no data has been received for a long time. It will not stop though as long as the connector emits something however. So as long as the connector is emitting a state message periodically (even if no data has come in), it can run forever.

So what I'm still thinking through is whether this belongs in the ConfiguredCatalog or if part of the protocol is to say that as long as a connector is emitting messages, it should be allowed to run.

Would love your perspective on what would be most intuitive to you in this regard.

  1. Output STATEs are reduced using RFC 7368 Merge PATCH.

Agreed.

  1. Add optional range to ConfiguredCatalog

Similar to #2, this concept fits into how we are thinking about evolving the protocol. The goal here, as you mentioned, is to pass a range to an instantiation of a connector so that it can operate on a shard of the data.

I agree that this needs to be an argument passed to a connector at runtime. Thus it needs to be passed into this interface somehow. I'm less sure if it makes sense to put this in ConfiguredCatalog, but I think this is just a detail of how we structure the standard.

Naively, instead of:

read(Config, AirbyteCatalog, State) -> Stream<AirbyteMessage>

we want:

read(Config, AirbyteCatalog, State, Range) -> Stream<AirbyteMessage>
  1. Add optional projections to ConfiguredStream

This aligns really well with our vision as well. Our preference is to surface this operation separate from the ConfiguredStream as a mapper task. We are speccing this out now and will share a proposal for what it will look like to make sure it covers your case.

  1. Adopting community-oriented naming for the protocol

Good point! We will brainstorm about this idea.

psFried commented 2 years ago

Thanks, this was an interesting and informative response. I think some of these proposals seem to warrant issues of their own, so that we can discuss further without mixing conversations. We're both out on vacation for another week, but I'm looking forward to discussing further after I'm back. Would it be ok for us to split off some separate issues then to resolve the discussions around proposals 2-6?

psFried commented 2 years ago

Some more thoughts on these:

  1. Allow a connector to produce multiple states over its lifetime.

Yeah, I think we just didn't get the right understanding from the docs on this. I took another look at these docs and they don't seem to specify one way or the other. We can just file a docs issue to clear that up.

  1. Add optional tail to ConfiguredCatalog

I think the issue here is that all connectors that can support running in tailing (blocking) mode can also support reading all avaialable data and then exiting (non-blocking mode), and we want the runtime to determine which mode should be used. Take the source-kinesis connector, for example. In tailing mode, it will never exit normally. In non-tailing mode, it will read and emit all the records up through the current wall clock time, and then exit. We want our runtime to tell the connectors which mode to run in because it supports a mode of running our CLI where the runtime itself can exit once it's processed all the available data.

  1. Output STATEs are reduced using RFC 7368 Merge PATCH.

Excellent! What's the next step with this? Can we just create a docs issue? Do we need to do any additional problem solving to ensure that existing connectors don't break due to the new behavior?

  1. Add optional range to ConfiguredCatalog

I don't think we really care exactly how this is passed, but the ConfiguredCatalog does seem like the most sensible place. Adding it to the State or Config seems just out of place. And adding a new CLI option seems like it would make backward compatibility a problem since most argument parsers would probably error on encountering an unrecognized --range. Maybe the spec can/should specify that unknown arguments are ignored?

  1. Add optional projections to ConfiguredStream

This one is really interesting, and I'd love to hear some other perspectives on it. My perspective is that since the connector is already responsible for converting tabular (or whatever kind of) data into JSON, then it might as well use the desired field names. Otherwise, you need to re-parse and re-shape the JSON as a separate step. That might seem like it's appropriate to do in a separate step, given that it's likely a capability that you'll want anyway, but I think it's really preferable to have connectors do it because it minimizes the changes to pipelines when changing from one source and/or format to another. It also potentially avoids needing to re-parse and re-shape the JSON, so it's at least theoretically more efficient.