dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.51k stars 1.45k forks source link

Possibility to pass config to op steps run trhough launchPartitionBackfill similarly to launchPipelineExecution #23304

Closed bojanstavrikj closed 2 months ago

bojanstavrikj commented 2 months ago

What's the use case?

Hello

I have an asset with a static partition, which has a graph of op's that run sequentially.

@op
def op1():
    print(f'this is the op1')

@op
def op2(input):
    print(f'this is the input I got {input}')

@asset
def asset1(partitions_def= StaticPartitionsDefinition(partition_keys=['Jan2024', 'Feb2024', 'Mar2024'])):
    op1(op2(input=1))

I am trying to run it through graphQL using launchPartitionBackfill. I manage to run it if I remove the variable input from op2. When it is in, I don't find a way to send in the inputs through config.The graphQL code I use is the following:

mutation {
      launchPartitionBackfill(
        backfillParams: {
           selector: {
            repositorySelector: {
              repositoryName: "__repository__"
                    repositoryLocationName: "orchestrator"
            }
            partitionSetName: "materialize_asset_extrato_mensal_partition_set"
            }
          partitionNames: ["Feb2024"]
          assetSelection: {
            path:"extrato_mensal"
          }
          title: "test"
        description: "test description"
        }
      ) {
            __typename
        ... on LaunchBackfillSuccess {
          backfillId
        }
        ... on PythonError {
          message
          stack
        }
      }
    }

I want to pass something like this:

runConfigData: {
        ops: {
          asset1: {
            ops: {
              op2: {
                inputs: {
                  input: "test"
                }
              }
            }
          }
        }
      }

I cannot find anything in the documentation or online. Is there any way to pass this already? If not possible, my other option will be to add some business logic to the partitions, then fetch the inputs I need from the database based on the partition key. Thanks in advance!

Ideas of implementation

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

bojanstavrikj commented 2 months ago

I found a way to do it, by looking at the graphql query that is sent when doing the same through the dagster UI.

Based on that the following solution works:

query = """mutation LaunchPipelineExecution($executionParams: ExecutionParams!) {
    launchPipelineExecution(executionParams: $executionParams) {
        ... on LaunchRunSuccess {
        run {
            id
            pipelineName
            __typename
        }
        __typename
        }
        ... on PipelineNotFoundError {
            message
        __typename
        }
        ... on InvalidSubsetError {
            message
        __typename
        }
        ... on RunConfigValidationInvalid {
        errors {
            message
            __typename
        }
        __typename
        }
        ...PythonErrorFragment
        __typename
    }
    }

    fragment PythonErrorFragment on PythonError {
        message
        stack
    errorChain {
        ...PythonErrorChain
        __typename
    }
    __typename
    }

    fragment PythonErrorChain on ErrorChainLink {
        isExplicitLink
        error {
            message
            stack
            __typename
        }
    __typename
    }"""

Then you need to pass in the variables to the function like this:

variables = {
  "executionParams": {
    "selector": {
      "jobName":"yourJobName",
      "repositoryName": "yourRepoName",
      "repositoryLocationName": "yourRepoLocName",
      "assetSelection": [
        {
          "path": [
            "yourAssetName"
          ]
        }
      ],
      "assetCheckSelection": []
    },
    "runConfigData": "ops:\n  yourAssetName:\n    ops:\n      yourOpName:\n        inputs:\n          input1: '1'\n          input2: '2'\n          input3: 3",
    "mode": "default",
    "executionMetadata": {
      "tags": [
        {
          "key": "dagster/partition",
          "value": "Feb2024"
        }
      ]
    }
  }
}