PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.46k stars 1.52k forks source link

prefect_dbt can't run `dbt source freshness` #13612

Open briankski opened 1 month ago

briankski commented 1 month ago

First check

Bug summary

Can't run the dbt source freshness command within a prefect task/flow https://docs.getdbt.com/reference/commands/source Using either the trigger_dbt_cli_command or DbtCoreOperation fails when trying to run the source freshness.

Using the trigger_dbt_cli_command is caused by the ordering of the cli args when building the command https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py#L160-L169 results with the command being

Running dbt command: ['source', '--profiles-dir', '/Users/briankalinowski/.dbt', '--project-dir', PosixPath('/Users/briankalinowski/Code/data-prefect/data-dbt'), 'freshness']

No matter what style you use each one results in source freshness commandnot found or dbt.cli.exceptions.DbtUsageException: No such option: --profiles-dir

When using DbtCoreOperation the command actually runs sucessfully but when its finished prefect still throws an error and marks the task as failed. See the error stack trace below for that example.

Reproduction

@flow
def bk_flow():
    trigger_dbt_cli_command(command="source", extra_command_args=["freshness"])
    # or 
    trigger_dbt_cli_command(command="source freshness"])

def bk_flow():
    d = "path/to/your/dbt-project"

    DbtCoreOperation(
        commands=["dbt source freshness"],
        project_dir=d,
    ).run()

Error

01:17:16  89 of 93 PASS freshness of stripe_raw.subscription_item ........................ [PASS in 2.03s]
18:17:16.686 | INFO    | Flow run 'pearl-jaguarundi' - PID 71034 stream output:
01:17:16  91 of 93 PASS freshness of tailscale_com.pages ................................. [PASS in 1.76s]
18:17:17.071 | INFO    | Flow run 'pearl-jaguarundi' - PID 71034 stream output:
01:17:17  90 of 93 PASS freshness of stripe_raw.tier ..................................... [PASS in 2.16s]
18:17:21.452 | INFO    | Flow run 'pearl-jaguarundi' - PID 71034 stream output:
01:17:21  92 of 93 PASS freshness of woodchipper.woodchipper ............................. [PASS in 6.48s]
18:17:21.672 | INFO    | Flow run 'pearl-jaguarundi' - PID 71034 stream output:
01:17:21  93 of 93 PASS freshness of woodchipper.woodchipper_ssh ......................... [PASS in 6.58s]
18:17:21.780 | INFO    | Flow run 'pearl-jaguarundi' - PID 71034 stream output:
01:17:21
18:17:21.781 | INFO    | Flow run 'pearl-jaguarundi' - PID 71034 stream output:
01:17:21  Done.
18:17:22.437 | ERROR   | Flow run 'pearl-jaguarundi' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/Code/data-prefect/data_prefect/flows/dbt/dbt_flows.py", line 159, in bk_flow
    ).run()
      ^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 304, in coroutine_wrapper
    return call()
           ^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
           ^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect_shell/commands.py", line 401, in run
    await shell_process.wait_for_completion()
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect_shell/commands.py", line 182, in wait_for_completion
    raise RuntimeError(
RuntimeError: PID 71034 failed with return code 1.
18:17:22.462 | ERROR   | Flow run 'pearl-jaguarundi' - Finished in state Failed('Flow run encountered an exception. RuntimeError: PID 71034 failed with return code 1.')
Traceback (most recent call last):
  File "/Users/briankalinowski/Code/data-prefect/data_prefect/flows/dbt/dbt_flows.py", line 165, in <module>
    bk_flow()
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/flows.py", line 1229, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/engine.py", line 293, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/engine.py", line 396, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/Code/data-prefect/data_prefect/flows/dbt/dbt_flows.py", line 159, in bk_flow
    ).run()
      ^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 304, in coroutine_wrapper
    return call()
           ^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
           ^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect_shell/commands.py", line 401, in run
    await shell_process.wait_for_completion()
  File "/Users/briankalinowski/miniconda3/envs/prefect_env/lib/python3.11/site-packages/prefect_shell/commands.py", line 182, in wait_for_completion
    raise RuntimeError(
RuntimeError: PID 71034 failed with return code 1.

Versions

Version:             2.19.1
API version:         0.8.4
Python version:      3.11.8
Git commit:          17a1b1d8
Built:               Thu, May 16, 2024 3:33 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         server

Additional context

No response

desertaxle commented 1 month ago

Thanks for raising this issue @briankski! We've added this issue to our backlog and will start working on it shortly!

briankski commented 1 month ago

has this been fixed in a new version? When I logged this I was on prefect==2.19.1 & prefect-dbt==0.5.0