PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.26k stars 1.58k forks source link

Not able to run Kubernetes Job on a self-hosted Prefect Server #15793

Open cvatsops opened 1 day ago

cvatsops commented 1 day ago

Bug summary

We have deployed a self-hosted Prefect(3.0.9) in our Kubernetes Cluster through helm chart and I am trying to run a Kubernetes Job but facing an issue.

I am trying to run a k8 job using below code:

job = KubernetesJob.from_yaml_file(
    credentials=k8s_credentials,
    manifest_path="/usr/src/app/sample_file.yaml",
)
# Save the job for reuse
job.save("my-k8s-job", overwrite=True)
@flow
def kubernetes_orchestrator():
    run_namespaced_job(job)

and getting below error in when running run_namespaced_job(job):

File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 413, in load_and_set
    self._load_cluster_info()
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 387, in _load_cluster_info
    self.cert_file = FileOrData(
                     ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 81, in __init__
    if data_key_name in obj:
       ^^^^^^^^^^^^^^^^^^^^
TypeError: argument of type 'NoneType' is not iterable 

Version info (prefect version output)

Prefect version is 3.0.9
Python version is 3.11

Additional context

No response

desertaxle commented 1 day ago

Thanks for opening an issue @cvatsops! Can you share what version of prefect-kubernetes you're using? You can get it by running prefect version which should give an output like this:

Version:             3.0.10
API version:         0.8.4
Python version:      3.12.5
Git commit:          3aa2d893
Built:               Tue, Oct 15, 2024 1:31 PM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.9.2
Integrations:
  prefect-kubernetes: 0.5.2
cvatsops commented 21 hours ago

Hello @desertaxle Thanks for having a look into this issue. We are using Prefect-kubernetes version : 0.5.1

desertaxle commented 13 hours ago

Thanks @cvatsops! Could you share the entire stack trace from your error and also share a code example for how you are creating your KuberentesCredentials block? Both those would be helpful in figuring out where this error is coming from.

cvatsops commented 12 hours ago

Sure.

I am using below code :

from prefect_kubernetes.credentials import KubernetesCredentials
k8s_credentials = KubernetesCredentials.load("k8s-creds")

I have created a KubernetesCredentials block in Prefect UI called k8s-creds

And then this is the below code:

v1_job_metadata = create_namespaced_job(new_job=V1Job(yaml_manifest),kubernetes_credentials=k8s_credentials)
    # Define the Kubernetes Job block
    k8s_job = KubernetesJob(job=v1_job_metadata,namespace="commons")
    job = KubernetesJob(
        credentials=k8s_credentials,
        # v1_job=yaml_manifest,
        v1_job=yaml.safe_load(yaml_manifest),
        namespace="commons",
        delete_after_completion=True,
        interval_seconds=5,
        timeout_seconds=300
    )
    # Run the Kubernetes Job
    run_namespaced_job(job)

While running run_namespaced_job(job) , i am getting the below error:

08:31:30.591 | ERROR   | Flow run 'belligerent-raccoon' - Finished in state Failed("Flow run encountered an exception: TypeError: argument of type 'NoneType' is not iterable")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 1345, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 821, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 701, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 255, in result
    raise self._raised
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.11/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/flows.py", line 40, in run_namespaced_job
    kubernetes_job_run = task(kubernetes_job.trigger)()
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/tasks.py", line 997, in __call__
    return run_task(
           ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1512, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1325, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 457, in result
    raise self._raised
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 763, in run_context
    yield self
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 1323, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.11/site-packages/prefect/task_engine.py", line 786, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 399, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 225, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 389, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/jobs.py", line 534, in trigger
    await create_namespaced_job.fn(
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/jobs.py", line 58, in create_namespaced_job
    async with kubernetes_credentials.get_client("batch") as batch_v1_client:
  File "/usr/local/lib/python3.11/contextlib.py", line 210, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/credentials.py", line 179, in get_client
    await config.load_kube_config_from_dict(
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 643, in load_kube_config_from_dict
    await loader.load_and_set(client_configuration)
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 413, in load_and_set
    self._load_cluster_info()
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 387, in _load_cluster_info
    self.cert_file = FileOrData(
                     ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes_asyncio/config/kube_config.py", line 81, in __init__
    if data_key_name in obj:
       ^^^^^^^^^^^^^^^^^^^^
TypeError: argument of type 'NoneType' is not iterable