estuary / flow

🌊 Continuously synchronize the systems where your data lives, to the systems where you _want_ it to live, with Estuary Flow. 🌊
https://estuary.dev
Other
642 stars 57 forks source link

captures: recast ingestion as a form of capture #179

Closed jgraettinger closed 2 years ago

jgraettinger commented 3 years ago

Currently we offer ingestion APIs (HTTP & Websocket) through a flow-ingester component that's separate from the flow-reactor.

It has a couple of issues:

A really lightweight sketch:

We should model ingestion as a kind of capture, alongside airbyte connectors

jgraettinger commented 3 years ago

I'm looking to pick this up shortly. @psFried as I recall you and I had a pinned conversation on this topic I wanted to unpin before starting coding work.

At a high level, I'd like to excise "ingestion" as a user-facing Flow concept. Everything is just a capture. Every capture has a unique name, and has one of several possible types. For example:

Captures are already unique resources within Flow, so using the capture name as-a HTTP path or Kafka topic, etc, is intended to allow for flexible but unique bindings of externally-facing addresses (topic names, HTTP paths, etc) and the collections which currently back them.

Further, a capture can bind multiple collections, so there's still a means to allow for multi-collection transactional POSTs for example, where the POST body names specific resource bindings of the capture, and those resolve to current collections. Or a sub-path of a capture POST path, etc.

psFried commented 3 years ago

I've been going back and forth on this topic for a while now, and I've landed on the same conclusion that ingestion is not the right concept, and that we should instead frame this more as "exposing" ports on the capture containers. Thinking more long term, I think there's probably also good reasons for exposing ports on derivation and materialization containers, too, and it would be nice if there were a sufficiently generic way of configuring the networking of containers that would allow for that.

It seems like we'll have to have some sort of proxy server that sits in front of everything and does authz and proxies requests to the right containers. And that proxy server will need to understand each protocol that it's proxying, and do routing based on the task name. When a user wishes to expose a port from their capture container, they'll need to specify the protocol (HTTP, kafka, etc), and that protocol will correspond to some concrete implementation in the proxy server that knows how to proxy it.

There's a minor wrinkle, because there's no way to always respect the key ranges of split captures shards when proxying requests. For example, a request that appends multiple documents would have multiple keys, which may fall into the ranges of different shards. The obvious answer there would be to allow for a different interpretation of "key" in the context of routing, instead of it always being the literal key of each document. Maybe we hash the value of some header? Does this need to be configurable? I'm curious to hear how you're thinking about this one.

Another random aside is that there might be some benefits to using a hostname instead of the request path for routing. A capture task called acmeCo/marketing/page-views could be given the hostname of page-views.marketing.acmeCo.flow.io. For HTTP, this would give the capture connector greater autonomy over how it intereprets HTTP requests, since the proxy would only need to care about the host and authz headers, and could ignore the path. It could also just be a routing mechanism that works for a wider variety of protocols. For example, we could use TLS client certificates and SNI to securely expose ports speaking arbitrary protocols, which could be really nice for things like exposing the nodejs debug port. But there again is the question of how you'd route to specific shards of task that is split, so :man_shrugging: .

jgraettinger commented 3 years ago

It seems like we'll have to have some sort of proxy server that sits in front of everything and does authz and proxies requests to the right containers.

💯 I've been thinking about this too. We need something that brokers access to a data-plane: Gazette RPCs, Flow RPCs, and also serves push-based captures. It should be the only way into a data-plane, including from the control-plane.

HTTP / websocket / Kafka would first go to this thing (flow-gateway?), which does smart routing of captures / journal / shard RPCs. It's a generalized requirement -- not just for captures -- that it route requests intelligently to the appropriate backend using knowledge of the catalog / shards / journals through etcd watches.

For an http/ws/kafka capture, it's mapping the host or path to a capture, and from there to an assigned shard & member process serving it, to which it proxies the request. For that reason I don't think we need to expose extra ports or anything in the flow-reactor. Ideally we continue to use the existing Ingest RPC internally (we can plumb through headers), and all of the http/ws/kafka details live in the gateway component. flowctl develop would serve both flow-reactor and also this gateway for local development.

EDIT: Actually, it doesn't need to be watching Etcd. It can also use regular gRPC clients for these with a pb.DispatchBalancer gRPC load balancer and LRU route cache. That'll work because all RPCs do automatic proxying with passed-back route updates, and we can do this for the Ingest RPC as well. That would make the gateway entirely stateless.

The obvious answer there would be to allow for a different interpretation of "key" in the context of routing, instead of it always being the literal key of each document.

I think that's right, and is already the case. The semantics of "key" in a capture shard is already up to the connector to define (does it reflect a chunk of S3 files ? Kenisis shard ranges? Kafka partitions? etc). Perhaps it's a random number assigned by the gateway to begin with, for uniform loading. We may come up with more valuable semantics later.

there might be some benefits to using a hostname instead of the request path for routing

Love this idea 👍 . Probably it's an either/or, like S3 does it, to support flowctl develop as well.