quantile-development / dagster-meltano

A Dagster plugin that allows you to run Meltano in Dagster
MIT License
41 stars 17 forks source link

Injecting parameters to dagster ops/job #19

Closed slamer59 closed 1 year ago

slamer59 commented 1 year ago

Hello, I have a run like this: meltano run tap-rest-api-msdk target-jsonl

and a meltano.yaml

plugins:
  extractors:
  - name: tap-rest-api-msdk
    variant: widen
    pip_url: tap-rest-api-msdk
    config:
      api_url: ...
      streams:
       - name: ...
         ...
         params:
           date_after: 01/01/2021
           ...
         primary_keys:
         - id

Is it possible to override parameters and inject them with _defs definitions at any level ?

Reading your code it is not feasible (due to command.split()). And meltano does not allow to inject variable via CLI. Only override meltano.yaml or with environment variable injection which we cannot do since dagster is running.

How would you override on the fly the date_after withtout touching the file ?

JulesHuisman commented 1 year ago

Hello, I have a run like this: meltano run tap-rest-api-msdk target-jsonl

and a meltano.yaml

plugins:
  extractors:
  - name: tap-rest-api-msdk
    variant: widen
    pip_url: tap-rest-api-msdk
    config:
      api_url: ...
      streams:
       - name: ...
         ...
         params:
           date_after: 01/01/2021
           ...
         primary_keys:
         - id

Is it possible to override parameters and inject them with _defs definitions at any level ?

Reading your code it is not feasible (due to command.split()). And meltano does not allow to inject variable via CLI. Only override meltano.yaml or with environment variable injection which we cannot do since dagster is running.

How would you override on the fly the date_after withtout touching the file ?

@slamer59 One way we could achieve this is by altering the ìns` of the Meltano run op. https://github.com/quantile-development/dagster-meltano/blob/a6e597e21313ad0f721a6b5d286e73f27ce29946/dagster_meltano/ops.py#L39

If we create some sort of custom typing for Meltano config, this could be injected on the fly by the previous op.

slamer59 commented 1 year ago

I need to think deeper since I never used ins. Another way, is to pass an argument from meltano_run_op

def meltano_run_op(command: str, args: dict) -> 

and pass it to

  log_results = meltano_resource.meltano_invoker.run_and_log(
        "run",
        MetadataLogProcessor,
        command.split(), **args
    )

etc... to subprocess.Popen

By the way, we dont you build on top of dagster-shell for curiosity.

JulesHuisman commented 1 year ago

I need to think deeper since I never used ins. Another way, is to pass an argument from meltano_run_op

def meltano_run_op(command: str, args: dict) -> 

and pass it to

  log_results = meltano_resource.meltano_invoker.run_and_log(
        "run",
        MetadataLogProcessor,
        command.split(), **args
    )

etc... to subprocess.Popen

Ah you don't need to inject values at Dagster runtime. Yes then a solution like this would work. It would relate to #18. As you can set config using environment variables.

By the way, we dont you build on top of dagster-shell for curiosity.

I wanted to process the process stdout and stderr in real time to gather Singer.io statistics. But looking at it again, this might be possible with dagster-shell as well.