conductor-sdk / conductor-python

Conductor OSS SDK for Python programming language
Apache License 2.0
52 stars 26 forks source link

Memory Continually Increasing Intermittently #267

Open ryorke1 opened 3 weeks ago

ryorke1 commented 3 weeks ago

Hello everyone,

My team seems to have stumbled upon a really weird issue with the conductor python sdk where the memory of the worker continues to climb until the worker runs out of resources (in our case our OpenShift cluster OOMKills the pod due to the resource utilization exceeding it's allocated quota). What is odd is the timing of the memory increasing is not consistent. Sometimes it will start as soon as the worker starts while other times it will start 30 to 45 minutes after the application starts. Additionally, once in a while it seems to not have this issue. I have tried tracing through the polling functionality within the SDK and cannot seem to find anything that stands out as the cause of the memory. Wondering if anyone else is experiencing this issue and what is the root cause?

memory-leak

This graph shows the example code below running and being restarts. When you see a long flat line that is when we turned it off and tried looking at code. Considering this workflow does absolutely nothing (execute doesn't even get called) I wouldn't expect to see a bunch of spikes in memory and this should just be flat for the most part.

Steps to reproduce:

  1. Setup a conductor server with a generic workflow (doesn't have to be anything specific)
  2. Deploy a pod with any number of workers greater than 1 (in our case we did this with 12 workers) using the code shown below.
  3. Ensure that the only package installed (besides base packages) is conductor-python.
  4. Monitor the memory

Sample Code: ` from conductor.client.automator.task_handler import TaskHandler from conductor.client.configuration.configuration import Configuration from conductor.client.http.models import Task, TaskResult from conductor.client.worker.worker_interface import WorkerInterface

configuration = Configuration( server_api_url=f"http://localhost:3500/", debug=True )

class SillyWorker(WorkerInterface): def execute(self, task: Task) -> TaskResult | None: return None

def get_polling_interval_in_seconds(self) -> float:
    return 1

workers = [ SillyWorker(task_definition_name="silly-worker-1"), SillyWorker(task_definition_name="silly-worker-2"), SillyWorker(task_definition_name="silly-worker-3"), SillyWorker(task_definition_name="silly-worker-4"), SillyWorker(task_definition_name="silly-worker-5"), SillyWorker(task_definition_name="silly-worker-6"), SillyWorker(task_definition_name="silly-worker-7"), SillyWorker(task_definition_name="silly-worker-8"), SillyWorker(task_definition_name="silly-worker-9"), SillyWorker(task_definition_name="silly-worker-10"), SillyWorker(task_definition_name="silly-worker-11"), SillyWorker(task_definition_name="silly-worker-12"), ]

def start_workers():

with TaskHandler(workers, configuration) as task_handler:
    task_handler.start_processes()
    task_handler.join_processes()

if name == "main": start_workers() `

Environment:

c4lm commented 2 weeks ago

Hey @ryorke1, Do you have some kind of proxying for that server_api_url? It's supposed to end with /api because orkes-conductor-server image has proxying for /api internally for API queries - do the logs show that the workers are polling? Does the pod have enough CPU for garbage collection? Have you tried newer or older python images? IIRC there were changes in 3.11 which were supposed to bring more performance at the cost of a bit more memory, and those changes also initially caused a few leaks in some libraries.

I would appreciate it if you could share Dockerfile + compose yaml for the worker with cpu/memory capped so that we could use them to reproduce the issue. It does not seem to occur on my machine w/ python 3.11.9/conductor-python=1.1.7 .

ryorke1 commented 2 weeks ago

Hi @c4lm

here is the dockerfile:

FROM python:3.11

COPY worker.py /app
COPY requirements.txt /app

WORKDIR /app

RUN pip install -r requirements.txt && \
    useradd -m worker

USER worker
CMD ["python", "worker.py"]

There is no compose file as we are running this in OpenShift. Once this image is built we push it into our image repository and deploy it using a generic helm deployment manifest. As for the URL, there is no proxy. In this case the test code was running it on localhost specifically on port 3500. I forgot to add the /api portion in the example as I was cleaning up the code to remove stuff to simplify the example.

Here is the manifest we are using for this testing in OpenShift:

kind: Pod
apiVersion: v1
metadata:
  name: conductor-testing
  namespace: default
spec:
  containers:
    - resources:
        limits:
          cpu: 4
          memory: 1Gi
        requests:
          cpu: 100m
          memory: 400Mi
      name: conductor-testing
      imagePullPolicy: IfNotPresent
      image: 'python:3.11'

We have also tried to reproduce this locally in docker and it takes a bit of restarting and leaving it running but eventually it will have this issue. It's not consistent like it is on OpenShift.

Here is a snippet of the logs received from the example script above:

2024-06-13 13:40:57,530 [193] conductor.client.automator.task_handler INFO     TaskHandler initialized
2024-06-13 13:40:57,531 [193] conductor.client.automator.task_handler INFO     Starting worker processes...
2024-06-13 13:40:57,533 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-1 with domain None with polling interval 1
2024-06-13 13:40:57,534 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-2 with domain None with polling interval 1
2024-06-13 13:40:57,536 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-4 with domain None with polling interval 1
2024-06-13 13:40:57,631 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-10 with domain None with polling interval 1
2024-06-13 13:40:57,723 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-3 with domain None with polling interval 1
2024-06-13 13:40:57,723 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-5 with domain None with polling interval 1
2024-06-13 13:40:57,724 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-6 with domain None with polling interval 1
2024-06-13 13:40:57,725 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:57,725 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:57,728 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-1?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:57,728 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-4?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:57,724 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-7 with domain None with polling interval 1
2024-06-13 13:40:57,823 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-1, worker_id: conductor-testing, domain: None
2024-06-13 13:40:57,823 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-9 with domain None with polling interval 1
2024-06-13 13:40:57,824 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-4, worker_id: conductor-testing, domain: None
2024-06-13 13:40:57,825 [193] conductor.client.automator.task_handler INFO     Started 12 TaskRunner process
2024-06-13 13:40:57,826 [193] conductor.client.automator.task_handler INFO     Started all processes
2024-06-13 13:40:57,923 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-11 with domain None with polling interval 1
2024-06-13 13:40:57,923 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-8 with domain None with polling interval 1
2024-06-13 13:40:57,924 [193] conductor.client.automator.task_runner INFO     Polling task silly-worker-12 with domain None with polling interval 1
2024-06-13 13:40:57,925 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:57,928 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-2?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:57,929 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-2, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,124 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,124 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,127 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-3?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,127 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-11?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,128 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-11, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,128 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-3, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,223 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,323 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,325 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,325 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,424 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-7?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,425 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,428 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,524 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-7, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,524 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-9?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,525 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-9, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,531 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-10?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,532 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-10, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,532 urllib3.connectionpool DEBUG    Starting new HTTP connection (1): localhost:3500
2024-06-13 13:40:58,532 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-5?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,532 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-12?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,534 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-8?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,535 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-8, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,532 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-6?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,623 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-5, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,623 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-12, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,624 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-6, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,826 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-1?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,826 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-4?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,827 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-1, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,827 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-4, worker_id: conductor-testing, domain: None
2024-06-13 13:40:58,932 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-2?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:58,933 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-2, worker_id: conductor-testing, domain: None
2024-06-13 13:40:59,131 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-11?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:59,131 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-11, worker_id: conductor-testing, domain: None
2024-06-13 13:40:59,131 urllib3.connectionpool DEBUG    http://localhost:3500 "GET /api/tasks/poll/silly-worker-3?workerid=conductor-testing HTTP/1.1" 200 2
2024-06-13 13:40:59,132 [193] conductor.client.automator.task_runner DEBUG    Polled task: silly-worker-3, worker_id: conductor-testing, domain: None
...
Process Process-9:
Process Process-10:
Process Process-3:
Process Process-4:
Process Process-13:
Process Process-2:
Process Process-12:
Process Process-8:
Process Process-5:
Process Process-11:
Process Process-6:
Process Process-7:
Process Process-1:
2024-06-13 13:41:10,112 [193] conductor.client.automator.task_handler INFO     KeyboardInterrupt: Stopping all processes
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 195
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 196
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 197
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 198
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 199
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 200
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 201
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 202
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 203
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 204
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 205
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 206
2024-06-13 13:41:10,113 [193] conductor.client.automator.task_handler INFO     Stopped worker processes...
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 195
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 196
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 197
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 198
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 199
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 200
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 201
2024-06-13 13:41:10,123 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 202
2024-06-13 13:41:10,124 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 203
2024-06-13 13:41:10,124 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 204
2024-06-13 13:41:10,124 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 205
2024-06-13 13:41:10,124 [193] conductor.client.automator.task_handler DEBUG    Terminating process: 206
2024-06-13 13:41:10,124 [193] conductor.client.automator.task_handler INFO     Stopped worker processes...

With regards to the CPU utilization, we have given the pod enough resources based on the metrics from the pod such that the CPU limit shouldn't be an issue as shown in this graph. Right now we are allowing the pod to utilize 1/5 of a CPU but the pod never exceeds this limit. However, that being said, I increased the pod to 4 CPUs and ran it for a short period of time and still see the same results.

image Before increasing CPU limits

image After increasing CPU limits to 4 cpus

With regards to different version of the conductor-python-sdk, we have tried going back to 1.1.4 but still see the same issues. For the Python version itself, I had not yet tried downgrading from 3.11 to 3.10. I just tried that and no change. I also tried using 3.12 and have the same results.

image Using python 3.10.14

image Using python 3.12.4

Please let me know if there is anything else I can provide to assist!

c4lm commented 2 weeks ago

@ryorke1 having a bit of a replication crisis over here

Using my compose file, memory usage consistently stays under 97Mb (cpu under 10%) according to docker stats if I let it run for a while - that is even lower than when running locally on my machine! I also did a bunch of runs with memray locally again

image

I can see the small memory RSS increase, but not sure if it's indicative of anything since memory was not bounded. At best this memory graph of one of the forked worker processes might serve as weak evidence that there is a small native memory leak when running on my Mac (which does not happen when running in docker), highest peak was 8mb higher than the starting peak, but Python heap was stable, it changed of course, but the peaks were the same, like 380Kb. Highest total amount of memory attributed to a line was 108 KiB (I ran memray with --trace-python-allocators and with -Xdev on python3).

Everything is complicated by CPython having version-specific leak issues and platform-specific leak issues, and then the libraries having them too (e.g. this one, and it's not the only one).

Regarding the libraries we use here - metrics are disabled, so it can't be prometheus-client, connection is http, so it can't be SSL-related leaks. So that leaves us with the following options:

  1. rest of the SDK
  2. logging
  3. our client library - requests/urllib3
  4. something from the above and our usage of multiprocessing

Usually whenever I encounter leaks if http requests are involved, it's 1 or 3 (regardless of the language involved), so for now we could stick to that assumption. Rest client is per process, session is per rest client and we do not close it because it is reused, so we should not be leaking there... I will try to do more runs, and if I still can't replicate, I will try to create a reduced example (essentially just multiprocessing + requests calls) for you to test and reach out. In the meantime, could you please try the following change (probably should not change anything, but what if):

import requests
configuration = Configuration(
    server_api_url=f"http://localhost:3500/api", debug=False
)
session = requests.Session()
session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=1, pool_maxsize=1))
configuration.http_connection = session

Additionally, it would be great if you could try running the script with memray and capture the results. e.g. python3 -m memray run --follow-fork leak.py it will generate 14 .bin files (1 for parent, 1 for logger, 12 for workers). you can use these files to create reports e.g. python3 -m memray flamegraph --leaks memray-leak.py.49244.bin.49260 You could also try running python3 -Xdev -m memray run --follow-fork --trace-python-allocators leak.py but beware, it produces large binary files.

What I was using in my docker-based attempts:

---
version: '3.9'

services:

  worker-1:
    image: "pythonleak"
    build:
      context: ./
      dockerfile: ./Dockerfile
    networks:
      - internal
    deploy:
      resources:
        limits:
          cpus: '4'
          memory: 1G

networks:
  internal:
FROM python:3.12

RUN mkdir app

COPY ./leak.py /app/

WORKDIR /app

RUN pip install conductor-python && \
    useradd -m worker

USER worker
CMD ["python", "leak.py"]
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import Task, TaskResult
from conductor.client.worker.worker_interface import WorkerInterface

configuration = Configuration(
    server_api_url=f"http://host.docker.internal:8080/api", debug=True
)

class SillyWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult | None:
        return None

    def get_polling_interval_in_seconds(self) -> float:
        return 1

workers = [
    SillyWorker(task_definition_name="silly-worker-1"),
    SillyWorker(task_definition_name="silly-worker-2"),
    SillyWorker(task_definition_name="silly-worker-3"),
    SillyWorker(task_definition_name="silly-worker-4"),
    SillyWorker(task_definition_name="silly-worker-5"),
    SillyWorker(task_definition_name="silly-worker-6"),
    SillyWorker(task_definition_name="silly-worker-7"),
    SillyWorker(task_definition_name="silly-worker-8"),
    SillyWorker(task_definition_name="silly-worker-9"),
    SillyWorker(task_definition_name="silly-worker-10"),
    SillyWorker(task_definition_name="silly-worker-11"),
    SillyWorker(task_definition_name="silly-worker-12"),
]

def start_workers():

    with TaskHandler(workers, configuration) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()

if __name__ == "__main__":
    start_workers()
ryorke1 commented 1 week ago

Good afternoon. Just want to follow up to let you know I am still doing some testing. So far I have tried with just a generic Session() object and haven't had any memory issues (it stayed around 400MB for a day). I also tried with a generic HTTPAdapter() and it also stayed flat. I am now trying with the Retry() object from urllib3 to see what happens. I am starting with the same setup as the codebase does to see if it reproduced the same issue and then will try playing with these settings to see if there is one setting that's causing this issue. Just wanted to post to keep the thread alive. Hopefully will have some more details very soon.