OpenFn / adaptors

The new home for OpenFn adaptors; re-usable connectors for the most common DPGs and DPI building blocks.
GNU General Public License v3.0
6 stars 8 forks source link

HTTP Streaming Support #370

Open josephjclark opened 1 year ago

josephjclark commented 1 year ago

This is a cross-repo epic to introduce better support for streaming APIs into our adaptors and runtime. I don't know how to structure this so here's a brain dump.

Streaming Design

In order to support streams for more efficient data processing, we need to make a number of changes.

From a very high level, I would like the runtime and adaptors to understand streams as first-class data type. In fact actually, I want arrays and streams to behave exactly the same way, and for the streaming layer to be almost seamless.

There is a whole host of considerations here and for now I'm just going to type them out.

Streams on state

It would be super helpful to write streams to state.

Obviously a stream doesn't serialize unless we explicitly write it, so by default it breaks the design. Probably right now the new runtime will drop it in fast-safe-stringify (I don't actually know).

It should be perfectly safe to pass a stream between jobs in a workflow (so long as the workflow runs in the same process, which so far it is designed to).

If there are any streams on the final state (and they are not exhausted), we should serialise them to text.

Detecting a Stream is a little tricky - we should ensure that any streams we use match instanceof Readable (the fetch body does not, for example).

This is all a bit easier if we support LazyArrays (see below).

This would have helped us recently in a case where we had to stream a large CSV file to JSON, serialize the JSON to a larger file than the original CSV, and then re-load that JSON in the next job, blowing platform memory limits. Forwarding the stream to the next job would have really helped.

Transformers / parsers

The big problem with a stream is: how do you convert the array buffer into something meaningful? Actually for us it's always: how do we convert the underlying data to JSON?

We probably need a common pipe operation. This would not be seamless, but would allow us to convert a raw response into a JSON or CSV stream. Adaptors may choose to provide seamless APIs where possible.

Parsing a http get might look something like this:

get(www, { parseAs: ‘stream’ ) // fetches data as a stream, writes to state.data
pipe(state.data, jsonArray) Note that it doesn’t start streaming yet - just adds to the pipeline
each(‘data’, fn(() => { // iterate over the stream
    // this receives json objects
});

We would have to provide a bunch of helper transformer functions for common types, and allow users to define their own. Maybe common exports a stream namespace.

LazyArrays

I have a bit of an idea that a Stream is just a lazy array, where we only load the bit we're using into working memory.

I envision a wrapper around Stream, a LazyArray but probably not actually called that, which presents an array-like interface:

const a = new LazyArray(someStream)
const someMappedLazyArray = a.map(() => {... } );
const someOilteredLazyArray = a.filter(() => {... } );
a.pipe(someStreamHandler)

These are not operations.

We should support each, filter, map.

This does assume the underlying stream represents a JSON array. It might be useful to give it a base transformer pipeline to do that, so you can pass a CSV through:

const a = new LazyArray(response.body, csvParse)

This would pipe through csvParse before calling filter, enabling the underlying stream to not be JSON.

One benefit of this is that the runtime can understand what a LazyArray is and give it special treatment on state.

This is all a bit aspirational and needs more thought.

One difficulty is that once a stream has been read, it's exhausted. So I can't do:

a.forEach()
a.filter()

Or perhaps more pertinently:

a.forEach()
a.forEach()

I don't know whether we should try and code around this or accept it as a fact of life (probably the latter). This is a moment where the stream API is not seamless and you may have to write job code differently.

each

Ultimately I think every adaptor function needs to be studied to ask "can this support streaming"?

An obvious candidate is each. It should accept a stream (or a jsonpath to a stream) and be able to iterate over the stream, invoking the callback.

The steam MUST represent a JSON array, so it may need to be pre-piped.

Internally it needs to recognise the iterable is a stream and may need to use a different syntax to actually handle the iteration. If the iterable is a LazyArray then I think this would actually be seamless?

For the record I'd also like a map and filter function which return to state.data. Or just a map which removes elements if you return null. But that's a different story.

josephjclark commented 1 year ago

I notice that salesforce jsforce has a streaming API - we should look at that and consider how it might work with the SF adaptor.