oss-aspen / 8Knot

Dash app in development to serve open source community visualizations using GitHub data from Augur. Hosted app: https://eightknot.osci.io
MIT License
47 stars 59 forks source link

Spike: Celery monitoring #643

Closed JamesKunstle closed 4 months ago

JamesKunstle commented 4 months ago

We'd like to know:

  1. How large the task backlog is at any given point in time,
  2. What the time-to-response is for a given task (task lifespan, including queue time),
  3. What the time-to-execute is for a given task (from start to end of actual execution),
  4. How much memory a worker is using, ... etc.

Most commonly, this is done by having Prometheus monitor a /metrics endpoint associated with the Celery workflow, and having Grafana visualize the data that Prometheus makes available. A custom dashboard can then be built, or one can be reused from the Grafana dashboard registry.

In this spike, we should try to put all of the pieces together to get monitoring working for a simple example queue and discuss how much effort it'll take to replicate the toy example for 8Knot.

JamesKunstle commented 4 months ago

This was our initial diagram of how we'd implement monitoring.

Screenshot 2024-02-12 at 4 48 02 PM

However, flower doesn't expose very interesting /metrics so we considered other options.

This exporter works with both the Redis and the RabbitMQ Celery broker flavors. It's something to consider in the future, but it's a relatively small project and it requires yet another thing to be added to our stack beyond just Prometheus and Grafana,

By replacing Redis with RabbitMQ as our broker, and using the rabbitmq-plugins enable rabbitmq_prometheus plugin, we get a /metrics endpoint for free at http://{rabbitmq_host}:15692. Prometheus can scrape that endpoint and collect good stats about the queue, generic to being used for Celery.

I wrote this small toy to populate the RabbitMQ queue for inspection:

# tasks.py

import celery
import time

app = celery.Celery(
    __name__,
    broker="amqp://localhost",
)

@app.task
def wait(wtime: int) -> int:
    print(f"Waiting: {wtime} seconds")
    time.sleep(wtime)

---
# app.py

import time
from tasks import wait

def main():

    # create 100 tasks to fill queue
    for i in range(100):
        print(f"creating task: {i}")
        wait.delay(i)

    # keep adding tasks to queue
    inc = 1
    while True:
        print(f"polling task {inc}")
        wait.delay(inc)
        inc = inc + 1
        if inc > 100:
            inc = 1
        time.sleep(inc)

if __name__ == "__main__":
    main()

With this application running, $ rabbitmq server online, prometheus scraping the rabbitmq /metrics endpoint, and grafana analysing the prometheus data source with the custom monitoring dashboard here we get a fairly good metrics dashboard.

Unfortunately, it's only answers our question about backlog-length because it isn't custom to Celery. Therefore, we should using a more Celery-specific exporter in the future, or building a more Celery-specific monitoring dashboard based on the RabbitMQ info made available.

JamesKunstle commented 4 months ago

Link to well-known Celery exporters that we could use in the future.