Closed caleb-recursion closed 2 years ago
I got past the above error by manually deleting the ~/.prefect/orion.db
file, and running prefect orion database reset
. Now the deployment creation is throwing a 409 conflict error.
Traceback (most recent call last):
File "prefect_submit.py", line 197, in <module>
app.run(run)
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/absl/app.py", line 312, in run
_run_main(main, args)
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/absl/app.py", line 258, in _run_main
sys.exit(main(argv))
File "prefect_submit.py", line 193, in run
asyncio.run(main(script_name, ecr_docker_image_uri))
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "prefect_submit.py", line 180, in main
deployment_id = await spec.create(client=client)
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/prefect/client.py", line 105, in with_injected_client
return await fn(*args, **kwargs)
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/prefect/deployments.py", line 233, in create
return await client.create_deployment(
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/prefect/client.py", line 1271, in create_deployment
response = await self._client.post(
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/prefect/client.py", line 278, in send
response.raise_for_status()
File "/home/rxrx/git/mousera2/build/venv/lib/python3.8/site-packages/prefect/client.py", line 224, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '409 Conflict' for url 'https://api-beta.prefect.io/api/accounts/cb6c3dbf-c8d9-4435-ab0d-08783d7d1dc3/workspaces/f0694c09-fb21-4f56-a735-17a25f7d9e56/deployments/'
Response: {'detail': 'Data integrity conflict. This usually means a unique or foreign key constraint was violated. See server logs for details.'}
For more information check: https://httpstatuses.com/409
Everything is on 2.0b11
I tried deleting and recreating the workspace in the cloud with no luck.
Hi Caleb! Thanks for reporting this. We're currently making a lot of changes, and some of them are breaking changes, so "pardon our dust." It won't last much longer!
I couldn't reproduce that error with a simple deployment and flow I created for this purpose, so I'm probably not hitting the same code path you did. Would you mind sharing an example flow and deployment that reproduce the error?
Hi @abrookins . Thanks for getting back. I understand you're making changes. Is beta.prefect.io only supporting the newest version? Is there generally meant to be backward compatability or no?
Ill paste a couple important files, you'd need our build system and a kubernetes cluster to run on, which are harder to share. But maybe this is enough. The main flow, prefect_submit.py
is called on a script containing a flow, like sample_flow.py
below. The flow is packaged in a docker container with all of its dependencies, pushed to an ECR repo, the prefect client is created dynamically, and the deployment is submitted via the python api. Other notes: There is a utils script that sets up the k8s yamls, and a shim Packager class that mimics your docker packager, but rather than packaging it just points to the location of the script in the existing container. The 409 seems to be dying on the the deployment creation. I'm using the newest Deployment
class.
prefect_submit.py
import prefect.utilities.filesystem
prefect.utilities.filesystem.to_display_path.__code__ = (lambda x: str(x)).__code__
import asyncio
import os
import re
import prefect
from absl import app, flags
from common import log
from common.deploy import package, utils
from prefect.cli.cloud import build_url_from_workspace, get_cloud_client
from prefect.deployments import Deployment, FlowScript
from prefect.exceptions import ObjectAlreadyExists
from prefect.flow_runners import KubernetesFlowRunner
from prefect.packaging.base import Packager
from prefect.packaging.docker import DockerPackager
from pydantic import validator
FLAGS = flags.FLAGS
flags.DEFINE_boolean('gpu', True, 'Run the flow with a GPU image')
JOB_RESTART_POLICY = "Never"
IMAGE_PULL_POLICY = "Always"
def is_valid_target_name(target_name):
mousera2_build_root = os.environ['MOUSERA2_ROOT']
cmakecache = open(f'{mousera2_build_root}/CMakeCache.txt', 'r')
log.check_notnone(cmakecache)
is_valid = False
for line in cmakecache:
if target_name in line:
is_valid = True
break
return is_valid
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix):]
return text
def infer_cmake_target_from_filename(python_file):
"""
Args:
python_file : the path to the python file containing the prefect flow program
Returns:
The dotted-cmake target name (e.g. storage.accounting.experiment_cost_report)
Current huristics in order are:
1. Check if there is a cmake target with the same name as the python file (minus the .py extension).
"""
mousera2_source_root = os.environ['MOUSERA2_SOURCE_ROOT']
full_path = os.path.abspath(python_file)
filename = os.path.basename(full_path)
filename = filename.replace('.py', '')
filepath = os.path.relpath(os.path.dirname(full_path), mousera2_source_root)
filepath = filepath.replace('/', '.')
target_name = f'{filepath}.{filename}'
target_name = remove_prefix(target_name, 'build.')
if not is_valid_target_name(target_name):
target_name = None
return target_name
def build_docker_env(script_target_name):
mousera2_build_root = os.environ['MOUSERA2_ROOT']
log.check_notnone(mousera2_build_root)
base_image = get_worker_image(gpu=FLAGS.gpu)
print("Building flow with base image: ", base_image)
# NOTE: Prefect images must have an undefined entrypoint... this is done by setting entry=None below
image_name, image_tag = package.package_in_docker(script_target_name, entry=None, base_image=base_image)
uris = utils.upload(image_name, image_tag, image_tag, ['ecr'])
uri = None
if uris:
uri = uris[0]
return uri
async def set_workspace(api_key: str):
cloud = get_cloud_client(api_key=api_key)
workspaces = await cloud.read_workspaces()
if len(workspaces) == 0:
raise RuntimeError("No workspaces in prefect cloud, need to create a workspace at beta.prefect.io .")
workspace = workspaces[0]
print(f"WARNING, defaulting to workspace '{workspace['workspace_handle']}'."
"The current version of beta.prefect.io only allows a single workspace. "
"This will need to be changed in the future.")
return build_url_from_workspace(workspace)
async def initialize_client():
api_key = os.environ.get("PREFECT2_API_KEY")
workspace_url = await set_workspace(api_key)
client = prefect.client.OrionClient(api=workspace_url, api_key=api_key)
try:
await client.create_work_queue("kubernetes")
print("Created work queue")
except ObjectAlreadyExists:
print("Work queue already exists")
return client
class Packager(DockerPackager):
image_reference: str
image_flow_location: str
@validator('image_reference')
def set_image_ref(cls, v):
return v
@validator('image_flow_location')
def set_flow_location(cls, v):
return v
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
async def package(self, flow):
return self.base_manifest(flow).finalize(image=self.image_reference,
image_flow_location=self.image_flow_location)
def get_worker_image(gpu=True):
mousera2_source_root = os.environ['MOUSERA2_SOURCE_ROOT']
cur_base_image_record = f'{mousera2_source_root}/ops/images/docker/runtime_{"gpu" if gpu else "cpu"}.txt'
base_image = open(cur_base_image_record, 'r').read().strip()
return base_image
def create_deployment_spec(flow_script, flow_image):
RUNNER_ENV = {"WORKER_IMAGE": flow_image,
"USE_GPU": FLAGS.gpu,
"PREFECT_LOGGING_LEVEL": "DEBUG",
"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": True,
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 5}
print("FLOW SCRIPT", flow_script)
return Deployment(
flow=FlowScript(path=flow_script),
flow_runner=KubernetesFlowRunner(image_pull_policy=IMAGE_PULL_POLICY,
env=RUNNER_ENV),
packager=Packager(image_reference=flow_image,
image_flow_location=f"/root/app/tools/eks/flows/{flow_script}")
)
def get_account_id(client):
account_id = re.search('accounts/(.*)/workspaces', str(client.api_url))
if account_id:
return account_id.group(1)
else:
raise NotImplementedError("Extracting user account ID failed, "
"prefect has change something in their API, need to update this.")
def get_workspace_id(client):
account_id = re.search('workspaces/(.*)/', str(client.api_url))
if account_id:
return account_id.group(1)
else:
raise NotImplementedError("Extracting workspace ID failed, "
"prefect has change something in their API, need to update this.")
def format_flow_url(account_id, workspace_id, flow_run_id):
return f"https://beta.prefect.io/account/{account_id}/workspace/{workspace_id}/flow-run/{flow_run_id}"
async def main(input_script: str, image_uri: str):
client = await initialize_client()
spec = create_deployment_spec(input_script, image_uri)
# await spec.validate(client=client)
deployment_id = await spec.create(client=client)
run_info = await client.create_flow_run_from_deployment(deployment_id)
account_id = get_account_id(client)
workspace_id = get_workspace_id(client)
flow_run_url = format_flow_url(account_id, workspace_id, run_info.state.state_details.flow_run_id)
print(f"Your flow '{run_info.name}' is running at: ")
print(flow_run_url)
def run(argv):
script_name = argv[1]
prefect_script_target_name = infer_cmake_target_from_filename(script_name)
ecr_docker_image_uri = build_docker_env(prefect_script_target_name)
asyncio.run(main(script_name, ecr_docker_image_uri))
if __name__ == "__main__":
app.run(run)
flow_utils.py
import os
from dataclasses import dataclass
import yaml
from prefect.task_runners import BaseTaskRunner, ConcurrentTaskRunner
from prefect_dask import DaskTaskRunner
@dataclass
class ResourceConfig:
always_pull_image: bool = False
max_workers: int = 1
min_workers: int = 1
cpus_per_worker: int = 1
threads_per_worker: int = 1
memory_per_worker: str = "1G"
gpus_per_worker: int = 1
def generate_task_runner(config: ResourceConfig) -> BaseTaskRunner:
try:
worker_image = os.environ['WORKER_IMAGE']
except KeyError:
return ConcurrentTaskRunner()
gpu_limit = 0
worker_name = "dask-worker"
if os.environ["USE_GPU"].lower() in ('true', '1', 't'):
worker_name = "dask-cuda-worker"
gpu_limit = config.gpus_per_worker
cmd = f"[{worker_name}, $(DASK_SCHEDULER_ADDRESS), --nthreads, '{config.threads_per_worker}', --no-dashboard, --memory-limit, {config.memory_per_worker}B]"
image_pull_policy = "Always" if config.always_pull_image else "IfNotPresent"
worker_spec_yaml = f"""
kind: Pod
spec:
automountServiceAccountToken: false
restartPolicy: OnFailure
containers:
- image: {worker_image}
imagePullPolicy: {image_pull_policy}
args: {cmd}
name: "{worker_name}"
resources:
limits:
cpu: "{config.cpus_per_worker}"
memory: {config.memory_per_worker}
nvidia.com/gpu: {gpu_limit}
requests:
cpu: "{config.cpus_per_worker}"
memory: {config.memory_per_worker}
nvidia.com/gpu: {gpu_limit}
"""
scheduler_spec_yaml = f"""
kind: Pod
spec:
automountServiceAccountToken: false
restartPolicy: OnFailure
containers:
- image: {worker_image}
imagePullPolicy: {image_pull_policy}
args: [dask-scheduler]
name: "dask-scheduler"
resources:
limits:
cpu: "1"
memory: 1G
requests:
cpu: "0.5"
memory: 1G
"""
task_runner = DaskTaskRunner(cluster_class="dask_kubernetes.KubeCluster",
cluster_kwargs={
"pod_template": yaml.safe_load(worker_spec_yaml),
"scheduler_pod_template": yaml.safe_load(scheduler_spec_yaml)
},
adapt_kwargs={"maximum": config.max_workers, "minimum": config.min_workers})
return task_runner
sample_flow.py
import time
from prefect import flow, task
import tools.eks.flows.flow_utils as flu
resources = flu.ResourceConfig(
max_workers=10,
min_workers=2,
cpus_per_worker=2,
memory_per_worker="14G"
)
task_runner = flu.generate_task_runner(resources)
"""
A Task is a single unit of work. 1 per worker with the above config.
"""
@task(retries=3)
def gpu_work(task_id):
import torch
matrix_size = (100, 100)
device_name = "cuda" if torch.cuda.is_available() else "CPU"
device = torch.device(device_name)
time.sleep(10)
a = torch.rand(matrix_size, device=device)
b = torch.rand(matrix_size, device=device)
c = torch.matmul(a, b)
print(f"TASK DEVICE: {device_name}")
return task_id, device_name
"""
A Flow is a is a higher level orchestration of work. Flows can have tasks and sub flows.
"""
@flow(task_runner=task_runner)
def gpu_flow():
try:
TASK_COUNT = 1000
print("STARTING FLOW", flush=True)
results = []
for task_id in range(TASK_COUNT):
results.append(gpu_work(task_id))
start = time.time()
for result in results:
print(result.result(), flush=True)
duration = time.time() - start
print(f"All done, took: {duration}", flush=True)
return 'ok'
except Exception as e:
print("EXCEPTION", e)
Also note this piece of code at the top of prefect_submit.py
import prefect.utilities.filesystem
prefect.utilities.filesystem.to_display_path.__code__ = (lambda x: str(x)).__code__
Its required because prefect died on symlinked flows when bundling them as a Deployment. Unrelated, just a small issue.
I updated to beta 12 and changed the flow_runner to the KubernetesJob concept, same 409 conflict error.
Thanks @caleb-recursion. What I need most is the Deployment spec you created that matches our latest version, with infrastructure instead of flow runners -- can you share that?
def base_job_manifest(envs):
"""Produces the bare minimum allowed Job manifest"""
return {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"labels": {}},
"spec": {
"template": {
"spec": {
"parallelism": 1,
"completions": 1,
"restartPolicy": "Never",
"containers": [
{
"name": "prefect-job",
"env": [{"name": name, "value": value} for name, value in envs.items()],
}
],
}
}
},
}
def create_deployment_spec(flow_script, flow_image):
RUNNER_ENV = {"WORKER_IMAGE": flow_image,
"USE_GPU": FLAGS.gpu,
"PREFECT_LOGGING_LEVEL": "DEBUG",
"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": True,
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 5}
return Deployment(
name=f"{flow_script.split('.')[0]}-{time.time()}",
flow=FlowScript(path=flow_script),
infrastructure=KubernetesJob(image_pull_policy=IMAGE_PULL_POLICY,
job=base_job_manifest(RUNNER_ENV)),
packager=Packager(image_reference=flow_image,
image_flow_location=f"/root/app/tools/eks/flows/{flow_script}")
)
Thanks, @caleb-recursion -- this is useful, and what I meant was the Prefect Deployment definition that you used. The equivalent code (and the exact code that you're using) to this code that you shared previously:
def create_deployment_spec(flow_script, flow_image):
RUNNER_ENV = {"WORKER_IMAGE": flow_image,
"USE_GPU": FLAGS.gpu,
"PREFECT_LOGGING_LEVEL": "DEBUG",
"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": True,
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 5}
print("FLOW SCRIPT", flow_script)
return Deployment(
flow=FlowScript(path=flow_script),
flow_runner=KubernetesFlowRunner(image_pull_policy=IMAGE_PULL_POLICY,
env=RUNNER_ENV),
packager=Packager(image_reference=flow_image,
image_flow_location=f"/root/app/tools/eks/flows/{flow_script}")
)
Thats what I just shared ^, unless you want the full file again?
@caleb-recursion I think the root cause here is attempting to create a deployment using a custom OrionClient
instance. There's a lot of logic for managing API keys / URLs that is ignored if you instantiate your own client. I can reproduce the specific issue here.
You're using a custom OrionClient object to create a deployment. As part of that creation, Prefect saves the corresponding infrastructure block. Saving a block triggers the instantiation of a prefect client using default logic from the environment https://github.com/PrefectHQ/orion/blob/main/src/prefect/blocks/core.py#L555. Based on the code, I don't PREFECT_API_KEY or PREFECT_API_URL have been set for this to work correctly (e.g. see them using PREFECT2_API_KEY). So what happens is
A simple way to reproduce this:
import asyncio
from prefect.client import OrionClient
from prefect import flow, Deployment
@flow
def foo(x: int = 1):
pass
d1 = Deployment(flow=foo, name="test")
async def main():
# instantiate an orion client using custom api url and api key
# note this is NOT THE SAME AS SET IN THE CURRENT PROFILE
#
# the current profile is empty
client = OrionClient(
api="<CORRECT API URL FOR CLOUD WORKSPACE>",
api_key="<A WORKING API KEY>",
)
await d1.create()
print("This one works fine")
await d1.create(client=client)
print("This one has an error")
if __name__ == "__main__":
asyncio.run(main())
zach@Zachs-MacBook-Pro ~/p/orion (sketch-out-or-filters)> prefect profile use tmp && prefect profile inspect tmp (py39orion)
Profile 'tmp' now active.
Profile 'tmp' is empty.
zach@Zachs-MacBook-Pro ~/p/orion > python error.py (py39orion)
This one works fine
Traceback (most recent call last):
File "/Users/zach/prefect/orion/error.py", line 32, in <module>
asyncio.run(main())
File "/opt/homebrew/Caskroom/miniconda/base/envs/py39orion/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/opt/homebrew/Caskroom/miniconda/base/envs/py39orion/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/Users/zach/prefect/orion/user_error.py", line 27, in main
await d1.create(client=client)
File "/Users/zach/prefect/orion/src/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/zach/prefect/orion/src/prefect/deployments.py", line 216, in create
return await client.create_deployment(
File "/Users/zach/prefect/orion/src/prefect/client.py", line 1263, in create_deployment
response = await self._client.post(
File "/opt/homebrew/Caskroom/miniconda/base/envs/py39orion/lib/python3.9/site-packages/httpx/_client.py", line 1820, in post
return await self.request(
File "/opt/homebrew/Caskroom/miniconda/base/envs/py39orion/lib/python3.9/site-packages/httpx/_client.py", line 1506, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/Users/zach/prefect/orion/src/prefect/client.py", line 277, in send
response.raise_for_status()
File "/Users/zach/prefect/orion/src/prefect/client.py", line 223, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '409 Conflict' for url 'https://api-beta.prefect.io/api/accounts/<FOO>/workspaces/<FOO>/deployments/'
Response: {'detail': 'Data integrity conflict. This usually means a unique or foreign key constraint was violated. See server logs for details.'}
For more information check: https://httpstatuses.com/409
In general, I would recommend avoiding custom use of OrionClient
and use profiles to manage API configuration
Thanks for explaining. Is it possible to achieve setting the API and workspace URL via a Python interface? Whether through a client, or profile?
I worked around it by calling a subprocess of the prefect login command. Thanks for finding the error.
Nice!
For setting within Python, you can also use the temporary_settings
context manager
https://orion-docs.prefect.io/api-ref/prefect/settings/#prefect.settings.temporary_settings
We'll definitely spend some time investigating ways to ensure the client is used consistently so things like this won't happen.
I am using the python API to submit a
Deployment
. The following migration error is thrown.