quantile-development / dagster-meltano

A Dagster plugin that allows you to run Meltano in Dagster
MIT License
41 stars 17 forks source link

How can I set a value for `max_retries`? #50

Open taeefnajib opened 7 months ago

taeefnajib commented 7 months ago

I'm using this code in my __init__.py file:

from dagster import Definitions, job, repository
from dagster_meltano import meltano_resource, meltano_run_op, load_jobs_from_meltano_project

meltano_jobs = load_jobs_from_meltano_project("<path to dagster project>")

defs = Definitions(jobs=meltano_jobs)

I'm getting the following error:

Exceeded max_retries of 0
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_plan.py", line 286, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 478, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 160, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/execute_step.py", line 95, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/compute.py", line 212, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/compute.py", line 181, in _yield_compute_results
    for event in iterate_with_context(
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 463, in iterate_with_context
    with context_fn():
  File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 72, in op_execution_error_boundary
    raise RetryRequestedFromPolicy(
The above exception was caused by the following exception:
dagster_meltano.exceptions.MeltanoCommandError: Command 'run tap-csv target-iceberg --force' failed with exit code 1
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 465, in iterate_with_context
    next_output = next(iterator)
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/compute_generator.py", line 131, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster/_core/execution/plan/compute_generator.py", line 125, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster_meltano/ops.py", line 93, in dagster_op
    output = meltano_resource.execute_command(f"{command}", env, context.log)
  File "/media/taeef/Projects4/Sidetrek/DataEngineering/dag-mel/dag/dagvenv/lib/python3.10/site-packages/dagster_meltano/meltano_resource.py", line 70, in execute_command
    raise MeltanoCommandError(
I tried changing the value for retries in `launchpad` but it is not working. Can we set the value in `load_jobs_from_meltano_project()` method?