dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.67k stars 1.47k forks source link

Python API for launching backfills #5428

Open dagsterbot[bot] opened 3 years ago

dagsterbot[bot] commented 3 years ago

Issue from the Dagster Slack

create python API for launching backfills for a job

This issue was generated from the slack conversation at: https://dagster.slack.com/archives/C01U954MEER/p1620888955102000?thread_ts=1620888955.102000&cid=C01U954MEER

Conversation excerpt:

U01C6NT8PCY: Hi, we have a pipeline that runs daily (on a schedule), The pipeline run config gets an effective date which drives the logic for its data extract. A by-product of the pipeline run is that certain conditions might be met that necessitate the same pipeline (or similar) to be run with a range of backdated effective_dates, a backfill of sorts.

The backfill would need a PartitionSet with a partition_fn that works out the dates based on something we can access at the time it is called, so it might need access to a Resource or at least an asset for which we've created an AssetMaterialization event. But I don't have access to a context with a run_id, so not sure how I can best achieve this.

Even if I can work out how to create the PartitionSet, how do I kick off the backfill? It seems I can only do it via CLI or Dagit UI. I would like to be able to do it via the python or GraphQL API.

What I want to do seems like a reasonably commonplace use case, so I feel I must be missing something obvious about the way Dagster backfills work. Can someone help?

Thanks Paul UH3RM70A2: <@UM49TQ8EB> U01C6NT8PCY: OK, I think I can work out a way to do this:

Part 1: Create the PartitionSet dynamically. I played around with generating variable partition sets with random numbers and could see that the Dagit UI at least showed different partition axis values each time I went in. So, I can use DagsterInstance to get an asset based on an asset_key and get the data I need from a JSON meta data entry. The PartitionSet will only ever have values generated from the most recent asset materialization event for the asset_key I care about, but that's probably acceptable.

Part 2 - Just noticed GraphQL has a launchPartitionBackfill mutation, so I'll have a look at that UM49TQ8EB: <@U01C6NT8PCY> Yeah, the launchPartitionBackfill might serve your purposes, though it’s worth noting that you might need to have the dagster-daemon running for the individual runs to get launched. There is a forceSynchronousSubmission flag that would submit all the runs in the graphql request if that’s something you need UM49TQ8EB: under the hood, it’s constructing a PartitionBackfill object, adding it to the instance (so that it appears in the persisted backfill table), and then submitting the runs using a submit_backfill_runs call U01C6NT8PCY: I tried the launchPartitionBackfill with the following:

mutation positionsBackfill(
      $partitionNames: [String!]!
    )
    {
      launchPartitionBackfill(backfillParams: {
        selector: {
          partitionSetName: "date_partition_set"
          repositorySelector: {
            repositoryName: "&lt;reponame&gt;"
            repositoryLocationName: "&lt;repolocationname&gt;"
          }
        }
        partitionNames: $partitionNames
        forceSynchronousSubmission: true
      }) 
      {
        ... on PartitionBackfillSuccess {
          backfillId
          launchedRunIds
        }
        ... on PipelineConfigValidationInvalid {
          pipelineName
          errors {
            message
            reason
          }
        }
      }
    }

and the query variables: {"partitionNames": ["2021-05-14", "2021-05-17"] and I got back:

  "data": {
    "launchPartitionBackfill": {
      "backfillId": "mzxeaeqk",
      "launchedRunIds": []
    }
  }
}

Why no runids? Also didn't see any sign of it in dagit. dagster-daemon is running. U01C6NT8PCY: Oh sorry - my bad. The partitionset had datetimes in it rather than just dates. Sorted. UM49TQ8EB: Nice! Let me know if you run into any other issues. Also, I would watch out for potential dagit perf issues when running with forceSynchronousSubmission flag being set to true. If you can run the dagster daemon with your instance, you might want to consider turning that flag off. U01C6NT8PCY: We actually set forceSynchronousSubmission to false but dagit still goes into nowhere land for a while. This may not be a problem since this may well be done overnight. Still, would like to know why it does that. U01C6NT8PCY: Hey <@UM49TQ8EB> I need to bump this topic. We've run into a new issue, having converted our repository names to match recommendations as laid out in the <https://docs.dagster.io/guides/dagster/graph_job_op|migration guide>. So for the graphql query we use to launch backfills, we need a repository name but this now varies according to the environment. I don't think there's a clean approach to specifying the repository_name for the query - we have to stuff it in config and get it out at runtime. Has there been any thought to providing a dagster API to launch backfills? I don't think adding something like this to the Python GraphQL client would help much but I guess it would at least allow the client to infer the repository name if we didn't supply it. UM49TQ8EB: Hi Paul… can you change the graphql mutation to take in repository name as a variable? Something like this:

  launchPartitionBackfill(backfillParams: {
    selector: {
      partitionSetName: "date_partition_set"
      repositorySelector: {
        repositoryName: $repositoryName,
        repositoryLocationName: $repositoryLocationName,
      }
    }
    partitionNames: $partitionNames
    forceSynchronousSubmission: true
  }) 
}

U01C6NT8PCY: Yes, and I'm already doing that. The problem is that it's just another variable we have to configure somehow and becomes another thing to remember to change if we ever change our repository name scheme. Pre op/graph/job we always kept the same repository name across all environments so we could keep this name in a global dict, but now we need to make it a per-environment variable. UM49TQ8EB: I see. For your use case, if we had a Python API for launching backfills, would you have the job definition in scope? UM49TQ8EB: e.g. launch_backfills_for_job(my_job, partition_keys_to_backfill) U01C6NT8PCY: yes, that would work for me I think. You may be interested in why we want to programatically launch backfills. In our case, which I don't think is unique, a historic change to some reference data can be detected, which necessitates some backfilling with ETL pipelines. The reference data might typically be finance-related, e.g. changes to prices, indices, etc. UM49TQ8EB: <@U018K0G2Y85> issue create python API for launching backfills for a job


Message from the maintainers:

Are you looking for the same documentation content? Give it a :thumbsup:. We factor engagement into prioritization.

alvarogomezuria commented 1 year ago

I have a very similar use case, and I'm also looking for this to be implemented.

harrylojames commented 1 year ago

+1. Currently backfills are the only way I can see to materialize a set of partitions in one go. Being able to do this from Python API would be a major help in my workflow. This issue would equally / better solve my problem https://github.com/dagster-io/dagster/issues/14905