Open efranksrecroom opened 1 year ago
Hi @efranksrecroom thanks for submitting the bug report and the thorough writeup!
I'm tagging @ahuang11 so that he can give this a look when he has a moment.
To debug this and determine whether it's really prefect-dbt
or something else, can you change weekly/production tests dbt run scripts to just (just the imports):
import subprocess
from prefect_dbt.cli.commands import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
I suspect it's not prefect-dbt
but the two layer imports:
File "/opt/prefect/production_test_dbt_run.py", line 7, in <module>
from dbt_provider import DbtProvider
File "/opt/prefect/dbt_provider.py", line 3, in <module>
from prefect_dbt import DbtCliProfile
Just did. Here is the flow:
import asyncio
from prefect import flow
from prefect_dbt.cli.commands import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
async def production_test_dbt_run(databricks_warehouse_id:str='fadf894ad6', schema_name:str='prefect', catalog_name:str='dev'):
pass
if __name__ == "__main__":
asyncio.run(production_test_dbt_run())
The other one is basically the same just a different name. The result is the same:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 883, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/opt/prefect/production_test_dbt_run.py", line 6, in <module>
from prefect_dbt.cli.commands import DbtCliProfile
ModuleNotFoundError: No module named 'prefect_dbt'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 260, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 182, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(import_object, str(import_path))
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
module = load_script_as_module(script_path)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'production_test_dbt_run.py' encountered an exception
Thanks for helping me disprove my initial hypothesis!
Now, I want to confirm whether both flows have the exact same packages installed with pip list.
Can you run this concurrently and paste the logs?
import os
import asyncio
from prefect import flow, get_run_logger
@flow
async def production_test_dbt_run(databricks_warehouse_id:str='fadf894ad6', schema_name:str='prefect', catalog_name:str='dev'):
logger = get_run_logger()
logger.info(os.system("pip list"))
if __name__ == "__main__":
asyncio.run(production_test_dbt_run())
Perhaps as a workaround, in your deployment, can you try adding EXTRA_PIP_PACKAGES?
{
"EXTRA_PIP_PACKAGES": "prefect-dbt"
}
Examples: https://discourse.prefect.io/t/creating-and-deploying-a-custom-kubernetes-infrastructure-block/1531#h-3-next-we-can-create-a-custom-infrastructure-block-to-use-this-job-6 https://docs.prefect.io/concepts/infrastructure/?h=extra_pip_packages#installing-extra-dependencies-at-runtime
Here is the package list. Both flows have the same list and there is no issue spawning N of these flows w/ prefect-dbt removed as an import.
adlfs==2023.1.0 agate==1.6.3 aiohttp==3.8.3 aiosignal==1.3.1 aiosqlite==0.18.0 alembic==1.9.2 anyio==3.6.2 apprise==1.2.1 asgi-lifespan==2.0.0 async-timeout==4.0.2 asyncpg==0.27.0 attrs==22.2.0 azure-core==1.26.2 azure-datalake-store==0.0.52 azure-identity==1.12.0 azure-storage-blob==12.14.1 Babel==2.11.0 cachetools==5.3.0 certifi==2022.12.7 cffi==1.15.1 charset-normalizer==2.1.1 click==8.1.3 cloudpickle==2.2.1 colorama==0.4.5 coolname==2.2.0 croniter==1.3.8 cryptography==39.0.0 databricks-sql-connector==2.3.0 dateparser==1.1.6 dbt-core==1.3.2 dbt-databricks==1.3.0 dbt-extractor==0.4.1 dbt-spark==1.3.1 docker==6.0.1 fastapi==0.89.1 frozenlist==1.3.3 fsspec==2023.1.0 future==0.18.3 google-auth==2.16.0 graphql-core==3.2.3 greenlet==2.0.1 griffe==0.25.4 h11==0.14.0 h2==4.1.0 hologram==0.0.15 hpack==4.0.0 httpcore==0.16.3 httpx==0.23.3 hyperframe==6.0.1 idna==3.4 isodate==0.6.1 Jinja2==3.1.2 jsonpatch==1.32 jsonpointer==2.3 jsonschema==3.2.0 kubernetes==25.3.0 leather==0.3.4 Logbook==1.5.3 lz4==4.3.2 Mako==1.2.4 Markdown==3.4.1 markdown-it-py==2.1.0 MarkupSafe==2.1.2 mashumaro==3.0.4 mdurl==0.1.2 minimal-snowplow-tracker==0.0.2 msal==1.21.0 msal-extensions==1.0.0 msgpack==1.0.4 msrest==0.7.1 multidict==6.0.4 networkx==2.8.8 numpy==1.23.4 oauthlib==3.2.2 orjson==3.8.5 packaging==21.3 pandas==1.5.3 parsedatetime==2.4 pathspec==0.9.0 pendulum==2.1.2 portalocker==2.7.0 prefect @ file:///opt/prefect/dist/prefect.tar.gz prefect-dbt==0.2.7 prefect-fivetran==0.1.0b0 prefect-shell==0.1.3 psutil==5.9.4 pyarrow==11.0.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 pycparser==2.21 pydantic==1.10.4 Pygments==2.14.0 PyJWT==2.6.0 pyparsing==3.0.9 pyrsistent==0.19.3 python-dateutil==2.8.2 python-slugify==7.0.0 pytimeparse==1.1.8 pytz==2022.7.1 pytz-deprecation-shim==0.1.0.post0 pytzdata==2020.1 PyYAML==6.0 readchar==4.0.3 regex==2022.10.31 requests==2.28.2 requests-oauthlib==1.3.1 rfc3986==1.5.0 rich==13.2.0 rsa==4.9 sgqlc==16.1 six==1.16.0 sniffio==1.3.0 SQLAlchemy==1.4.46 sqlparams==5.0.0 sqlparse==0.4.3 starlette==0.22.0 text-unidecode==1.3 thrift==0.16.0 toml==0.10.2 typer==0.7.0 typing_extensions==4.4.0 tzdata==2022.7 tzlocal==4.2 urllib3==1.26.14 uvicorn==0.20.0 websocket-client==1.4.2 Werkzeug==2.2.2 yarl==1.8.2
As I mentioned, as long as I only run 1 flow that uses prefect-dbt, it runs fine so I know it's there but if I run multiple flows that contain that package, it will fail with the import error. Initially, I thought it had to do with pygit2 but when I removed that package, the error remained until I removed prefect-dbt.
I have numerous other flows for other tools like Fivetran and Databricks that use this pattern of having a provider class that encapsulates the logic and has the import statements and none of them encounter this issue around concurrency.
What if you keep pygit2
but exclude prefect-dbt
? I think it'd result in the same error like prefect-dbt.
Would you be able to try using EXTRA_PIP_PACKAGES?
I also pinged others from the core team.
Your hunch is correct for pygit2 (also prefect-shell) but if I swap in prefect-fivetran, there are no errors.
I can definitely try adding EXTRA_PIP_PACKAGES if you point me in the right direction. For deployments, I build them from this:
deployment = Deployment.build_from_flow(
flow = flow,
name = flow_name,
description = description,
work_queue_name = work_queue_name,
storage = self.storage,
infrastructure = KubernetesJob(
finished_job_ttl=60,
job_watch_timeout_seconds=60,
),
infra_overrides = {'image':f'{self.image}', 'namespace': f'{self.namespace}'},
tags = tags,
schedule = IntervalSchedule(interval=interval_in_seconds, anchor_date=schedule_anchor),
parameters = flow_parameters,
is_schedule_active = False
)
Can you try this:
deployment = Deployment.build_from_flow(
flow = flow,
name = flow_name,
description = description,
work_queue_name = work_queue_name,
storage = self.storage,
infrastructure = KubernetesJob(
finished_job_ttl=60,
job_watch_timeout_seconds=60,
env={"EXTRA_PIP_PACKAGES": "prefect-dbt"}
),
infra_overrides = {'image':f'{self.image}', 'namespace': f'{self.namespace}'},
tags = tags,
schedule = IntervalSchedule(interval=interval_in_seconds, anchor_date=schedule_anchor),
parameters = flow_parameters,
is_schedule_active = False
)
That does appear to fix it! I was able to fire off a dozen or so simultaneous flows that access this same set of packages with no issue.
That said, it would seem this is contra to recommended approach in the documentation.
I can use this as a temporary work around but I assume that I'll still want to continue to build my own image and remove this once the bug is identified and fixed?
One more note on the bug that I was going to add as it just started happening when I added retries (and before I pushed out your changes). If I add automatic retries w/ a 10 second delay, the first retry will fail with the ModuleNotFoundError
but the second retry will again run.
My guess is that the image is still building and installing packages, but the flow prematurely executes without the set up complete.
A potential reason why it's only affecting prefect-dbt is because prefect-dbt's requirements are slightly more complex, taking a bit longer to install. Either that or prefect-dbt is installed near the end, and by the time the flow runs, it's not yet installed.
Actually, I was wondering, instead of putting the image under infra overrides, can you put it under KubernetesJob? https://docs.prefect.io/api-ref/prefect/infrastructure/#prefect.infrastructure.KubernetesJob
That might be the root of the issue. https://docs.prefect.io/api-ref/prefect/infrastructure/#prefect.infrastructure.KubernetesJob
deployment = Deployment.build_from_flow(
flow = flow,
name = flow_name,
description = description,
work_queue_name = work_queue_name,
storage = self.storage,
infrastructure = KubernetesJob(
finished_job_ttl=60,
job_watch_timeout_seconds=60,
image=self.image,
),
infra_overrides = {'namespace': f'{self.namespace}'},
tags = tags,
schedule = IntervalSchedule(interval=interval_in_seconds, anchor_date=schedule_anchor),
parameters = flow_parameters,
is_schedule_active = False
)
Just gave that a try and got the error again.
Trying to run actual commands has led to another error regarding the package(s) not being installed :(
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 110, in trigger_dbt_cli_command
raise ImportError(
ImportError: dbt-core needs to be installed to use this task; run `pip install "prefect-dbt[cli]"
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
result = await flow_call()
File "/opt/prefect/statsig_production_dbt_run.py", line 20, in statsig_production_dbt_run
await provider.run_cli_command(command)
File "/opt/prefect/dbt_provider.py", line 59, in run_cli_command
await self.ensure_dependencies()
File "/opt/prefect/dbt_provider.py", line 51, in ensure_dependencies
await trigger_dbt_cli_command('dbt deps', dbt_cli_profile=self.dbt_cli_profile, project_dir='./dbt')
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1082, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 89, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 110, in trigger_dbt_cli_command
raise ImportError(
ImportError: dbt-core needs to be installed to use this task; run `pip install "prefect-dbt[cli]"
09:59:30 AM
Finished in state Failed('Task run encountered an exception: ImportError: dbt-core needs to be installed to use this task; run `pip install "prefect-dbt[cli]"\n')
I think you need to install prefect-dbt[cli]
:
env={"EXTRA_PIP_PACKAGES": "prefect-dbt[cli]"}
Let me know if that works.
That seems to work but seems again very odd given that package should be installed via the other requirement?
prefect-dbt
supports both dbt Cloud and dbt Core (CLI), so to reduce dependencies, we separated dbt Core (CLI) into setup extras. However, I don't have strong feelings for moving the setup extras into requirements.txt. cc @desertaxle
Just a note, while I can run 2 at the same time now, doing so results in the 1st one now crashing mid run...
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 114, in shell_run_command
raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
Could not run dbt
02:30:53 PM
trigger_dbt_cli_command-321ca940-1
Finished in state Failed('Task run encountered an exception: RuntimeError: Command failed with exit code 2:\n Could not run dbt\n\n')
02:30:53 PM
trigger_dbt_cli_command-321ca940-1
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
result = await flow_call()
File "/opt/prefect/production_test_dbt_run.py", line 23, in production_test_dbt_run
await provider.run_cli_command(command)
File "/opt/prefect/dbt_provider.py", line 56, in run_cli_command
await trigger_dbt_cli_command(command_text, dbt_cli_profile=self.dbt_cli_profile, profiles_dir='./dbt/profiles', overwrite_profiles=True, project_dir='./dbt')
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1082, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 89, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 114, in shell_run_command
raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
Could not run dbt
I suspect this error is unrelated, i.e. there's a misconfiguration in the dbt profile, but the logs are not ideal to prove that (working on a refactoring of prefect-dbt using the new prefect-shell block for better logs).
Can you try running the dbt commands without flows / packages?
Yes they run fine. They have been running on Databricks Workflows for many weeks...
I think this is again that same issue and I think I'm going to have to add, literally, every package that is in my Dockerfile install into that deployment hack. Attached are the Job pod logs for 3 concurrent runs. You'll note that for the first (malachite-grebe), requirements are fine and it just runs. For the other 2, they must install packages. I have zero clue why this is an issue for dbt flows and not Fivetran but I can run 10+ Fivetran flows at the same time and have never hit this issues.
green-agouti-nd9w6-k7pb8.log malachite-grebe-lff8z-7xl78.log rose-civet-wxstc-tnhqc.log
I think this is the issue:
Credentials in profile "rec_room", target "dev" invalid: Runtime Error
Could not find adapter type databricks!Defined profiles:
- rec_room
For more information on configuring profiles, please consult the dbt docs:
Some dbt CLI profiles require additional installation; for example Databricks:
pip install dbt-databricks
So I think:
env={"EXTRA_PIP_PACKAGES": "prefect-dbt[cli],dbt-databricks"}
Indeed. That's what I'm doing right now. As I mentioned, I'm going to just put my entire Dockerfile RUN pip install command into EXTRA_PIP_PACKAGES to unblock while this bug is being ironed out.
First check
Bug summary
I'm not 100% positive that this has anything to do with prefect-dbt however, it is the only set of flows that is experiencing this issue. I have several flows that are very simple and execute a dbt CLI command. If I run them 1 at a time, the flow loads fine and the code runs. If, however, I attempt to run 2 flows that use the prefect-dbt package, the second flow will not run and will continue to return an error similar to the below until the 1st flow is finished running.
In addition to this error, obtained from the UI, I also see the following in my cluster logs.
Attached are 2 sample flows that execute dbt CLI commands along with the definition of the DbtProvider class. I've also attached my k8s yaml file. Note I've changed the extensions so that I can attach them and faked some IDs.
dbt_provider.txt production_test_dbt_run.txt weekly_tests_dbt_run.txt dev.txt
Reproduction
Deployment Azure Kubernetes using custom image created using a Dockerfile that only has the following 2 lines:
Error
Versions
Additional context
No response