dlt-hub / verified-sources

Contribute to dlt verified sources 🔥
https://dlthub.com/docs/walkthroughs/add-a-verified-source
Apache License 2.0
72 stars 50 forks source link

Support for SQL Server with Change Tracking (CT) or Change Data Capture (CDC) #490

Open f-tremblay opened 1 year ago

f-tremblay commented 1 year ago

Feature description

To load data from SQL Server, it's necessary to support reading incremental data updates from native features like Change Tracking (CT) or Change Data Capture (CDC).

Adopting a similar approach would allow incremental loading of data from SQL Server

Are you a dlt user?

I'd consider using dlt, but it's lacking a feature I need.

Use case

No response

Proposed solution

No response

Related issues

No response

z3z1ma commented 1 year ago

dlt is a library not an application, but you can probably open an issue in the below as they curate common sources based on demand and bandwidth here (community contributions are accepted): https://github.com/dlt-hub/verified-sources/

rudolfix commented 1 year ago

@z3z1ma would wrapping singer tap work? on the other hand this tap is in Clojure and you need java sdk to run it. but theoretically ie. for postgres?

z3z1ma commented 1 year ago

Probably would work. Alto is tied to PEX but could be extended to support arbitrary bins. Currently looks like this to run a singer tap in an isolated cached environment with state managed by dlt

/helpers

import typing as t

import alto.engine
import alto.utils
import dlt

def singer_resource_factory(
    bin_name: str, pip_url: str, /, config: dict, select: t.List[str] = None
):
    """Factory for creating a dlt.resource function for a singer tap."""
    if select is None:
        select = ["*.*"]
    e = alto.engine.AltoTaskEngine(
        config={
            "default": {
                "load_path": "raw",
                "project_name": "dlt",
                "taps": {
                    f"tap-inline-{bin_name}": {
                        "capabilities": ["state", "catalog"],
                        "config": config,
                        "load_path": "raw",
                        "pip_url": pip_url,
                        "select": select,
                        "executable": bin_name,
                    }
                },
            }
        }
    )
    e.setup()
    (tap,) = alto.engine.make_plugins(
        f"tap-inline-{bin_name}",
        filesystem=e.filesystem,
        configuration=e.configuration,
    )

    @dlt.resource(name="tap")
    def _tap():
        state = dlt.current.resource_state()
        with alto.engine.tap_runner(
            tap,
            e.filesystem,
            e.alto,
            state_key=tap.name,
            state_dict=state,
        ) as tap_stream:
            for payload in tap_stream:
                if payload is None:
                    continue
                typ, _, message = payload
                if typ == "STATE":
                    alto.utils.merge(message["value"], state)
                elif typ == "RECORD":
                    yield dlt.mark.with_table_name(message["record"], message["stream"])
                elif typ == "SCHEMA":
                    pass

    return _tap

And a source

@dlt.source(section="linkedin_ads", max_table_nesting=3, root_key=True)
def linkedin_ads(access_token=dlt.secrets.value):
    yield singer.singer_resource_factory(
        "tap-linkedin-ads",
        "git+https://github.com/singer-io/tap-linkedin-ads#egg=tap-linkedin-ads",
        config={
            "access_token": access_token,
            "start_date": "2021-01-01T00:00:00Z",
            "accounts": "...",
            "request_timeout": 300,
            "user_agent": "harness/tap-linkedin-ads",
        },
    )