PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.65k stars 1.65k forks source link

Possible memory leak when using DaskExecutor in Kubernetes #3966

Closed joelluijmes closed 1 year ago

joelluijmes commented 3 years ago

Description

I’m running Prefect in Kubernetes. What I have is a flow which spawns 15 Kubernetes high CPU intense jobs. In order to do some parallelism, I have in the flow a DaskExecutor configured (6 workers, 1 thread).

What I see is the prefect-job which is created by the Kubernetes agent, uses quite some resources which grows over time. See attached screenshot. image

Note: I’m talking about the job created by the prefect agent. The actual job executing code is only using 188MiB.

It seems like there is a possible memory leak in either Prefect or Dask. Is there a better alternative to deploy this? Which uses less resources?

Expected Behavior

Well first of all, I wouldn't expect this high memory usage. But most of all, not that it seems to be growing indefinitely. In this instance, the flow wasn't killed (yet) by Kubernetes due high memory usage, but that is something I was running into.

Reproduction

Reproduction is bit tricky as this flow is generated from a config file on running the script. I think I got all the relevant bits below:

executor = DaskExecutor(cluster_kwargs={"n_workers": 6, "threads_per_worker": 1, "memory_limit": "1GB"})
schedule = CronSchedule(cron, start_date=datetime.utcnow())
handler = slack_notifier(only_states=[state.Failed])
storage_gcr = Docker(
    base_image="prefecthq/prefect:0.14.1-python3.8",
    registry_url="...",
    python_dependencies=["pandas"],
)
with Flow(
    f"CloudSQL to BigQuery ({cron})",
    storage=storage,
    executor=executor,
    schedule=schedule,
    run_config=KubernetesRun(
        # prefect_job_template contains a job spec describing resource limits, secrets, nodeselector..
        job_template=prefect_job_template
    ),
) as flow:
    # Class with @resource_manager which spawns off a pod and service to run CloudSQL Proxy
    with CloudSQLProxyManager(...) as service_name:
        # Job which calls RunNamespacedJob.run() on run
        job = CreateSyncJob(
            job_spec_template,
            state_handlers=[cleanup_on_failure, handler],
        )
        # Link the dependency of task to the CloudSQLProxyManager
        task = job(service_name)

Visualizing one of the flows show: image

Environment

Running Prefect Server 0.14.0 on Kubernetes 1.18.x in Google Cloud.

Originally reported on slack: https://prefect-community.slack.com/archives/CL09KU1K7/p1610706804087800

jcrist commented 3 years ago

I think dask and kubernetes are unlikely to be the culprits here, rather I suspect your CreateSyncJob (or the RunNamespacedJob) task accumulates memory over time. Since these tasks are mostly IO bound, you might try running with a LocalDaskExecutor instead to eliminate dask.distributed from consideration.

# Swapping out executor for this should do it
executor = LocalDaskExecutor(num_workers=6)
joelluijmes commented 3 years ago

I just tried it with the LocalDaskExecutor, and the initial results are much more promising. Although the job is only running for 10 minutes so far, the memory usage is much lower and actually constant.

image

Here is another screenshot of flow which used DaskExecutor which ran for many hours until it finally was OOMKilled:

image

So it does seem something may be off with dask?

--

Anyhow for reference here is my implementation of the CreateSyncJob. It takes in a templated Kubernetes job spec, which is filled in at template_job_spec (i.e. setting name of the job, environment variables etc), and then calls RunNamespacedJob.

class CreateSyncJob(Task):
    """
    Task which runs Kubernetes job which sync database from CloudSQL to BigQuery.

    Args:
      - job_spec_template (str): content of sql_to_bigquery.job.yaml.
      - spec (pd.Series): row of parse_specs_to_pandas containing sync variables.
    """

    def __init__(self, job_spec_template, spec, **kwargs):
        """
        Task which runs Kubernetes job which sync database from CloudSQL to BigQuery.

        Args:
        - job_spec_template (str): content of sql_to_bigquery.job.yaml.
        - spec (pd.Series): row of parse_specs_to_pandas containing sync variables.
        """
        self.job_spec_template = job_spec_template
        self.spec = spec

        super().__init__(
            name=spec["sourceDatabase"],
            **kwargs,
        )

    def run(self, cloudsql_name: Task):
        """
        Templates the job spec and creates the Kubernetes job using Prefect's RunNamespacedJob.

        Args:
          - cloudsql_name (Task): output of the CloudSQLProxyManager (or generate_cloudsql_name).
        """
        job_spec = template_job_spec(cloudsql_name, self.job_spec_template, self.spec)
        self.logger.info(f"Templated job_spec for {self.name}")
        self.logger.debug(job_spec)

        RunNamespacedJob(job_spec, namespace="prefect", kubernetes_api_key_secret=None).run()

Is there any benefit for using the DaskExecutor? Otherwise I'm fine with using the LocalDaskExecutor, which uses a ThreadPool instead of dask right? if you need more information to diagnose this, I'm willing to research it 👍

jcrist commented 3 years ago

Hmmm, that's interesting. I'd still be surprised if this is a bug in dask. A larger reproducible example I could run locally would definitely help. If you drop the with CloudSQLProxyManager bit and only run some dummy jobs (say a long sleep task) are you still able to reproduce the issue? If so, that'd help me for debugging locally.

Is there any benefit for using the DaskExecutor? Otherwise I'm fine with using the LocalDaskExecutor, which uses a ThreadPool instead of dask right?

For your flow above, I'd expect using a LocalDaskExecutor with threads to be sufficient. The tasks you're running are IO bound (so will run fine in threads), and should be low in memory usage. See https://docs.prefect.io/orchestration/flow_config/executors.html#choosing-an-executor for our docs on selecting an executor to use.

novotl commented 3 years ago

👋 We have a similar setup, running Prefect in Kubernetes and using DaskExecutor for our flow with 8 workers. All workers are eating more and more memory until all of them OOM. After a bit of trial and error, we added these env vars to Dask workers yaml, which slows down the leak considerably. Credits to https://stackoverflow.com/questions/63680134/growing-memory-usage-leak-in-dask-distributed-profiler/63680548#63680548

DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms 
DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms

Other related links:

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] commented 1 year ago

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

charalamm commented 4 months ago

Hello,

I think I am seeing this issue and does not seem to be solved by using the environment variables DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL and DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE nor by using the LocalDaskExecutor.

We are using prefect 1.4

A minimal working example

from prefect import task, Flow, run_configs, storage, executors

from time import sleep
from random import randint

@task()
def mem_n_slp(x):
    sleep(x)
    # Make the task fail randomly with 5% chance
    if randint(0, 100) < 5:
        raise ValueError("Random failure")
    return x

@task()
def just_slp(x):
    sleep(5 - x)
    return x

if __name__ == "__main__":
    project_name = "test-flows"
    flow_name = "test-ram-release"
    with Flow(
        name=flow_name,
        storage=storage.Azure(
            container="container-name",
            stored_as_script=False,
            connection_string_secret="AZURE_STORAGE_CONNECTION_STRING",
        ),
        run_config=run_configs.KubernetesRun(
            image="prefecthq/prefect:1.4.0", # We are actually using another image with the same prefect version so apologies if this image does not work,
            env={
                "PREFECT__CLOUD__USE_LOCAL_SECRETS": True,
                "DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL": "10000ms",
                "DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE": "1000000ms",
            },
            image_pull_policy="Always",
            cpu_limit="0.5",
            cpu_request="0.5",
            memory_request="0.5Gi",
            memory_limit="0.5Gi",
            job_template={
                "apiVersion": "batch/v1",
                "kind": "Job",
                "spec": {
                    "ttlSecondsAfterFinished": 600,
                    "template": {
                        "metadata": {
                            "annotations": {
                                "cluster-autoscaler.kubernetes.io/safe-to-evict": "false"
                            },
                            "labels": {
                                "project-name": project_name,
                                "flow-name": flow_name,
                            },
                        },
                        "spec": {
                            "containers": [{"name": "flow"}],
                            "nodeSelector": {"prefect_label": "job_pod_node"},
                        },
                    },
                },
            }
        )
    ) as current_flow:
        A = mem_n_slp.map(1000 * [1] + 1000 * [2])
        B = mem_n_slp.map(A)
        C = mem_n_slp.map(B)
        D = mem_n_slp.map(C)
        E = mem_n_slp.map(D)
        F = just_slp.map(E)

    current_flow.executor = executors.dask.LocalDaskExecutor(num_workers=250)
    current_flow.register(project_name=project_name)

I tried doubling and making half the values of DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL and DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE but no luck.

We are observing a steady increase of memory up to the point that the container gets OOMKilled.

charalamm commented 4 months ago

When we use the flow without the KubernetesRun the flow gets completed

cicdw commented 4 months ago

Hi @charalamm - Prefect 1.x is no longer under active development; please upgrade to 2.x or 3.x.

charalamm commented 4 months ago

@cicdw Thanks for you response. Is this problem though a known problem with prefect 1.x and is there any known solution?