meltano / sdk

Write 70% less code by using the SDK to build custom extractors and loaders that adhere to the Singer standard: https://sdk.meltano.com
https://sdk.meltano.com
Apache License 2.0
88 stars 64 forks source link

Make "internal stream map" transformations available to developers #1231

Open aaronsteers opened 1 year ago

aaronsteers commented 1 year ago

[stub]

ReubenFrankel commented 1 year ago

As a SDK user, I would like to leverage stream maps from Stream classes in a developer-friendly way.

Currently, you can implement stream_maps for a stream, but it requires that you define transformation mappings as a string to be later interpreted by simpleeval internally (there is some nuance to this and I wasn't able to get it to work for our use case yet). The code required to achieve this is close to how you might define a stream map in JSON, and as a result is quite long-winded and the mapping expressions do not benefit from syntactic/semantic validation provided by IDEs.

class LogsStream(Auth0Stream):
    """Define logs stream."""

    name = "stream_auth0_logs"
    path = "/logs"
    primary_keys = ["log_id"]
    replication_key = "log_id"
    schema = LogObject.schema
    log_expired = False
    stream_maps = [
        CustomStreamMap(
            name,
            None,
            schema,
            primary_keys,
            {
                "scope": "record.get('scope') and record['scope'].split()",
            },
            None,
        )
    ]

As a POC, I started implementing something this might eventually look like in record_transforms:

class LogsStream(Auth0Stream):
    """Define logs stream."""

    name = "stream_auth0_logs"
    path = "/logs"
    primary_keys = ["log_id"]
    replication_key = "log_id"
    schema = LogObject.schema
    log_expired = False

    def jwtConfigurationExpToExpiresIn(value: str, record: dict):
        return record["expires_in"]

    @property
    def record_transforms(self):
        return {
            "scopes": {     # scopes is defined as `ArrayType` in `LogObject` schema
                str: str.split,     # function to apply if value is of type `str`
                list: lambda value, record: map(str.lower, value.split())     # function to apply if value is of type `list`
            },
            "jwt_configuration": {
                "exp": {    # nested properties supported
                    str: self.jwtConfigurationExpToExpiresIn
                }
            }
        }

record_transforms is applied in Stream.post_process:

CoercionMapType = Dict[Union[str, type], Union["CoercionMapType", Dict[_T, Callable[[_T], Any]]]]

...

class Stream(metaclass=abc.ABCMeta):
    """Abstract base class for tap streams."""

    ...

    def post_process(self, row: dict, context: dict | None = None) -> dict | None:
        """As needed, append or transform raw data to match expected structure.

        Optional. This method gives developers an opportunity to "clean up" the results
        prior to returning records to the downstream tap - for instance: cleaning,
        renaming, or appending properties to the raw record result returned from the
        API.

        Developers may also return `None` from this method to filter out
        invalid or not-applicable records from the stream.

        Args:
            row: Individual record in the stream.
            context: Stream partition or context dictionary.

        Returns:
            The resulting record dict, or `None` if the record should be excluded.
        """

        row = _apply_record_transforms(row, self.record_transforms)
        return row

    @property
    def record_transforms(self) -> dict[str, CoercionMapType]:
        """A map defining how record property values of a given type should be transformed.

        Returns:
            A `dict` mapping of property keys to nested property keys or a mapping of value match types to transform functions.
        """
        return {}

def _apply_record_transforms(obj: dict, coercion_map: CoercionMapType):
    for key, value in coercion_map.items():
        obj_value = obj.get(key)

        if obj_value is None:
            continue

        if any(isinstance(v, dict) for v in value.values()):
            _apply_record_transforms(obj_value, value)
            continue

        transform_fn = value[type(obj_value)]
        obj[key] = transform_fn(obj_value)

    return obj

Ideally, this would integrate with the existing stream maps implementation, but a refactor of some sort is probably necessary to support native Python transform mappings and processing of nested properties.


Related to https://github.com/meltano/sdk/issues/792

stale[bot] commented 1 year ago

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

ReubenFrankel commented 1 year ago

Still relevant IMO.

stale[bot] commented 6 days ago

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

ReubenFrankel commented 6 days ago

Still relevant