dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.95k stars 1.5k forks source link

Dagster Scheduler not updating DLT start_time #25733

Open walter-robson opened 3 weeks ago

walter-robson commented 3 weeks ago

What's the issue?

I have several DLT pipelines deployed and orchestrated with dagster. When I manually execute the pipelines by materializing them via Dagster, the start_time for the incremental load is updated properly for subsequent runs. However, when I have these pipelines scheduled, the start_time (time.last_value) is set to whatever the last manual materialization was. Scheduled jobs do not update the start time.

What did you expect to happen?

I expect the start_time for the pipeline to be updated for subsequent runs.

How to reproduce?

Below is an example resource

@dlt.resource(
        name="ticket_metric_events", primary_key="id", write_disposition="append"
    )
    def ticket_metric_table(
        zendesk_client: ZendeskAPIClient,
        time: dlt.sources.incremental[str] = dlt.sources.incremental(
            "time",
            initial_value=start_date_iso_str,  
            allow_external_schedulers=False,
            last_value_func=max,
            row_order="asc",  # pages are returned in ascending chronological order
        ),
    ) -> Iterator[TDataItem]:
        """
        Resource for ticket metric events table. Returns all the ticket metric events from the starting date,
        with the default starting date being January 1st of the current year.

        Args:
            zendesk_client: The Zendesk API client instance, used to make calls to Zendesk API.
            time: Incremental source for the 'time' column,
                indicating the starting date for retrieving ticket metric events.
                Defaults to dlt.sources.incremental("time", initial_value=start_date_iso_str).

        Yields:
            TDataItem: Dictionary containing the ticket metric event data.
        """

        metric_event_pages = zendesk_client.get_pages(
            "/api/v2/incremental/ticket_metric_events",
            "ticket_metric_events",
            PaginationType.CURSOR,
            params={
                "start_time": ensure_pendulum_datetime(time.last_value).int_timestamp, # This is what is not updating
            },
        )
        print(ensure_pendulum_datetime(time.last_value).int_timestamp)
        print(time.last_value)
        for page in metric_event_pages:
            yield page

            if time.end_out_of_range:
                return

Here is an example of how the resource is materialized:

@dlt_assets(
    dlt_source=zendesk_support().with_resources("ticket_metric_events"), 
    dlt_pipeline=pipeline(
        pipeline_name="zendesk_data",
        dataset_name="dlt_zendesk_data",
        destination="snowflake",
        progress="log",
    ),
    name="zendesk",
    group_name="zendesk",
)
def dagster_zendesk_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    yield from dlt.run(context=context)

Here is an example of of the job definition:

ticket_metric_events = AssetSelection.assets("dlt_zendesk_support_ticket_metric_events")
ticket_metric_events_job = define_asset_job(name="support_ticket_metric_events_job", 
                                            selection=ticket_metric_events,
                                            description="dlt_zendesk_support_ticket_metric_events asset materialization")

Here is an example of the job schedule:

ticket_metric_events_schedule = ScheduleDefinition(
    job=ticket_metric_events_job,
    cron_schedule="30 5 * * *",
    description="materializes the ticket metric events job Every Morning, at 1:30",
)

When looking at the logs, we can observe that the start_date is not updating after subsequent Dagster scheduled runs.

Dagster version

1.8.13

Deployment type

Dagster Helm chart

Deployment details

Deployed on Azure Kubernetes Services using Helm

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization. By submitting this issue, you agree to follow Dagster's Code of Conduct.

cmpadden commented 2 weeks ago

Hi @walter-robson , thank you for opening an issue.

I spoke with some team members internally, and we're still struggling to understand why the behavior would be different during a manually invocation of the asset materialization, and when it is scheduled. Can you confirm that when you say manual materialization you mean by doing so through the UI?

We'd also be curious to know if when you schedule the job, wait for completion, and then query the time column in the destination table, does the max(time) match what you would expect for an incremental load in the next run?

The way you've implemented this is quite similar to how we are using dlt in our internal pipelines, see:

https://github.com/dagster-io/dagster-open-platform/blob/1650338b50f422ab47929c143fddd45551c1f660/dagster_open_platform/dlt/sources/github/__init__.py#L107-L112

Grasping at straws, I would also be curious if you renamed the argument name from time to something non-conflicting with the built-in time module, but I don't suspect that is really the issue.

Another thing that we might be able to do to debug things is change the code to set the value to dlt.sources.incremental in the body of the function, and add some logging so that we can see if a value is being passed to the resource:

   def ticket_metric_table(
        zendesk_client: ZendeskAPIClient,
        time: Optional[dlt.sources.incremental[str]] = None
    ) -> Iterator[TDataItem]:
        # add some logging here to see what `time` is...
        if not time:
            time = dlt.sources.incremental(
                "time",
                initial_value=start_date_iso_str,  
                allow_external_schedulers=False,
                last_value_func=max,
                row_order="asc",  # pages are returned in ascending chronological order
            )