Since Prefect exposes an async Python interface and Django does not play well with async functions, we split the Prefect interface off into a FastAPI project
These endpoints will be called only from the Django server or from testing scripts. More project documentation can be found in the wiki
Clone the Prefect Proxy repository
In the cloned repository, run the following commands:
pyenv local 3.10
pyenv exec python -m venv venv
source venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
create .env
from .env.template
set the value for the LOGDIR
in the .env
file with the name of the directory to hold the logs. The directory will be automatically created on running the prefect proxy
Start Prefect on port 4200
prefect server start
and set PREFECT_API_URL
in .env
to http://localhost:4200/api
. Change the port in this URL if you are running Prefect on a different port.
Next, start your Prefect worker(s). On Tech4Dev's production system, Dalgo runs three workers on two queues called ddp
and manual-dbt
:
prefect worker start -q ddp --pool dalgo_work_pool
prefect worker start -q manual-dbt --pool dalgo_work_pool
(Make sure to set up the work pool from the prefect UI first).
The proxy server needs to listen for requests coming from Django; pick an available port and run
gunicorn proxy.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:<port number>
Make sure to add this URL with the port number into the .env
for DDP_backend in the variable PREFECT_PROXY_API_URL
.
All orchestration flow runs (scheduled or manual) are executed in Prefect by these workers. Dalgo needs to be notified when these flow runs reach a terminal state (success or failure) in order to clear up resources & notify users of failures.
Steps to create a webhook in Prefect:
Notifications
Custom Webhook
.Webhook URL
to http://localhost:8002/webhooks/v1/notification/
(assuming the Django server is listening on http://localhost:8002
).Method
to POST
Set the Headers
as shown below. The notification key here should be the one set in Dalgo backend .env
under PREFECT_NOTIFICATIONS_WEBHOOK_KEY
{"X-Notification-Key": "********"}
Set the JSON Data
to
{"body": "{{body}}"}
Run states
that we are interested in are Completed
, Cancelled
, Crashed
, Failed
, TimedOut
Use the prefect API GET http://localhost:4200/api/flow_run_notification_policies/{id}
to make sure that this notification has this message_template:
Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}
PATCH http://localhost:4200/api/flow_run_notification_policies/{id}
Prefect api docs will be available at http://localhost:4200/api/docs
FastAPI endpoints are defined in main.py
. These typically call functions in service.py
.
Most communication with Prefect is via its SDK, but occasionally we need to make HTTP requests; these are done using
service.prefect_get
service.prefect_post
service.prefect_patch
service.prefect_delete
The Prefect API's base URL is set in .env
via the variable PREFECT_API_URL
.
Logs are sent to a single logfile called prefect-proxy.log
which is written to the LOGDIR
specified in the .env
.
Tests are run via pytest
:
GOOGLE_APPLICATION_CREDENTIALS=<your/credentials.json> pytest