meltano / sdk

Write 70% less code by using the SDK to build custom extractors and loaders that adhere to the Singer standard: https://sdk.meltano.com
https://sdk.meltano.com
Apache License 2.0
87 stars 64 forks source link

feat: Allow an inline stream map to set output stream name dynamically #2502

Open menzenski opened 2 weeks ago

menzenski commented 2 weeks ago

Feature scope

Other

Description

See this thread (and this message in particular) in the Meltano slack for more context.

Here's the scenario I'd like to be able to implement:

Given a tap that produces messages to a generic database_records stream, where the records in that stream have a namespace object with a database and collection property (”namespace”: {“database”: “customer_service”, “collection”: “Customer”} for example), I'd like to dynamically split the database_records stream into many streams, one for each namespace.database and namespace.collection value. For the example record with ”namespace”: {“database”: “customer_service”, “collection”: “Customer”} that should be mapped to a new stream with stream_id customer_service-Customer here (as the hyphenating will let us take advantage of handling in the target to write this record to a specific table).

edgarrmondragon commented 2 weeks ago

This is even tougher than I imagined and I think you were hinting at this in Slack. The way stream maps currently work is by generating:

So if a stream with 10 records has 2 stream maps applied to it, then the resulting mapped stream will emit:

Now, in the current implementation the generated SCHEMA messages don't depend on the individual records, only on the stream map expression. With this proposal however, SCHEMA messages would also need to be dynamic since the stream alias would depend on the contents of each individual record, so this change would require some non-trivial refactoring of the stream maps implementation.

That is, for any implementation of a "stream splitter" like this, based on the SDK stream maps or not, the following transformation would have to occur:

Original Singer output:

{"type": "SCHEMA", "stream": "tenant_resources", "schema": {"properties": {"tenant_id": {"type": "string"}, "resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_001", "resource": "resource_A"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_002", "resource": "resource_A"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_001", "resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_002", "resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_001", "resource": "resource_C"}}

Transformed output based on the tenant_id property:

{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_A"}}
{"type": "SCHEMA", "stream": "tenant_002-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_A"}}
{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_B"}}
{"type": "SCHEMA", "stream": "tenant_002-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_B"}}
{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_C"}}

Notice because of the arbitrary order of records, a SCHEMA message may be generated multiple times. With some smart caching, we might get to simplify this to:

{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_A"}}
{"type": "SCHEMA", "stream": "tenant_002-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_A"}}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_C"}}