meltano / sdk

Write 70% less code by using the SDK to build custom extractors and loaders that adhere to the Singer standard: https://sdk.meltano.com
https://sdk.meltano.com
Apache License 2.0
94 stars 69 forks source link

feat: `streams.core.Stream` should read the `key_properties` automatically. #2343

Closed haleemur closed 2 weeks ago

haleemur commented 6 months ago

Feature scope

Taps (catalog, state, stream maps, tests, etc.)

Description

If the key_properties is present in the schema, then the primary_keys attribute should be derived from it. This will allow tap authors to keep the definition contained in 1 place (within the schema) and not have to override the method primary_keys for each stream.

key_properties is in the singer spec. However, I was not able to find primary_keys being set in these classes (Stream, GraphQLStream, RESTStream).

The SQLStream class defines primary_keys as self._singer_catalog_entry.metadata.root.table_key_properties or []. This is likely safe as, if the key properties are available in the metadata, they should be used, though I haven't read enough code to determine exactly how the key properties are set in the metadata.

edgarrmondragon commented 6 months ago

Hey @haleemur, thanks for filing!

Can you say more about where a key_properties definition would fit in a stream's JSON schema? What JSON schema features could we use for this? Or would this be a new top-level attribute that's not part of the JSON schema spec?

This will allow tap authors to keep the definition contained in 1 place

Then we should probably include replication_key in there as well.

and not have to override the method primary_keys for each stream.

I'm not sure I follow. You can have a parent class define primary_keys to avoid duplication.

haleemur commented 6 months ago

Hmm, maybe I mis-understood the flow of data as well & as a result my idea is not very clearly expressed. Please let me know if I'm incorrect in the following design.

I see this pattern in several plugin code bases & requires the developer to specify details about 1 structure in a couple of different places.

example (taken from tap-klaviyo):

  1. specify the schema in the schemas path
  2. define the schema as a class inheriting from some [common class] https://github.com/MeltanoLabs/tap-klaviyo/blob/main/tap_klaviyo/streams.py#L16 (in this case KlaviyoStream <- RestStream <- Stream)
  3. set the property primary_keys to a list of fields in the class.

In this example the schema is specified in 1 place but the primary key is specified elsewhere. Maybe there's an opportunity to define primary_keys in the same place if schema.yml file has the top level attribute key_properties or $key_properties (confession: I don't know what the significance of the $ sigil is in the tap's json schema files).

example (using the same stream as above):

Maybe the schema file could be written as:

{
    "$key_properties": ["id"] <-- this is the only material edit. Other edits just condense the schema vertically
    "properties": {
        "name": {
            "type": ["string", "null"]
        },
        "id": {
            "description": "The event ID",
            "type": ["string", "null"]
        },
        "type": {
            "description": "The event type",
            "type": ["string", "null"]
        },
        "datetime": {
            "description": "Event timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmmmmm)",
            "format": "date-time",
            "type": ["string", "null"]
        },
        "attributes": {
            "properties": {
                "metric_id": {
                    "description": "The metric ID",
                    "type": ["string", "null"]
                },
                "profile_id": {
                    "description": "Profile ID of the associated profile, if available",
                    "type": ["string", "null"]
                },
                "timestamp": {
                    "description": "Event timestamp in seconds",
                    "type": ["integer", "null"]
                },
                "event_properties": {
                    "description": "Event properties, can include attribution data, identifiers and extra properties",
                    "type": ["object", "null"]
                },
                "datetime": {
                    "description": "Event timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmmmmm)",
                    "format": "date-time",
                    "type": ["string", "null"]
                },
                "uuid": {
                    "description": "A unique identifier for the event, this can be used as a cursor in pagination",
                    "type": ["string", "null"]
                }
            },
            "type": ["object", "null"]
        }
    }
}

This would allow the tap's class to be made more concise. If we similarly read the name & replication_key from some attribute specified in the schema, we can avoid a couple more attributes that need to be declared in python code. Sticking with the example above, the EventsStream class could be expressed a little more concisely, but more importantly, all the declarative bits of the stream (schema, primary key & replication key) are specified in 1 place.

class EventsStream(KlaviyoStream):
    """Define custom stream."""

    path = "/events"
    schema_filepath = SCHEMAS_DIR / "event.json"

    def post_process(
        self,
        row: dict,
        context: dict | None = None,  # noqa: ARG002
    ) -> dict | None:
        row["datetime"] = row["attributes"]["datetime"]
        return row

    @property
    def is_sorted(self) -> bool:
        return True

We could support this in a backward compatible manner. When the Stream class reads the schema, if it finds the property $key_properties, we could set the streams primary key to be so. In the Stream class's init method, maybe we could add the following logic in the base class. All existing code would still be valid, and upgrading the sdk would still be safe as this still support setting the property primary_keys in a child class.

     if '$key_properties' in self.schema and not self.primary_keys:
         self.primary_keys = self.schema["$key_properties"]