mantiumai / chirps

Discover sensitive/confidential information stored in a vector database
GNU General Public License v3.0
57 stars 7 forks source link

Periodic Tasks & Optimize Worker Health Check #182

Closed zimventures closed 10 months ago

zimventures commented 10 months ago

The work done in #146 added an enhanced UI component for determining if the Celery workers are available. Unfortunately, the call to app.inspect() is synchronous, and takes upwards of 5 seconds to return. It is not a desirable user experience to see the loading spinner for that long each time a page loads!

We will eventually need the ability to run scans on a schedule. a package like django-celery-beat is well suited to use the existing Celery functionality to periodically run things. This ticket will add support for that, and the first interval-based task: the resource scanner.

Periodic Task Configuration

A new configuration will be added to the celery.py file to configure the new periodic task.

app.conf.beat_schedule = {
    # Execute the ping task every  minute
    'celery-worker-ping-1min': {
        'task': 'worker.tasks.ping_task',
        'schedule': crontab(minute='*/1'),
    },
}

Celery Beat Worker

The Celery management command will need to be augmented to start and stop the beat functionality. In addition, the initialize_app management command will need to be updated to support beat processing.

Worker Resource Tracking

A new model will be added to track what workers are available to Celery during the application startup. It's these workers that will be checked in the new worker resource monitor task.

class Worker(models.Model):
   celery_name: models.CharField()
   last_ping: models.DateTimeField()
   available: models.BooleanField()

Detecting Workers

Within the WorkerConfig class, the ready(self) method will be overridden in order to perform one-time initialization during application startup. The pseudo-code for ready() is as follows:

from chirps.celery import app

def ready(self):
   # Delete all of the previous workers
   Worker.objects.all().delete()

   # Query Celery to find out what workers are available
   celery_inspection = app.control.inspect()
   celery_statuses = celery_inspection.stats()

The call to celery_inspection.stats() will return information about all of the workers that are currently running. The contents of the response are a dictionary, keyed by the ID of the worker.

{'w1@fc74cd718b1d': 
 {'total': {'scan.tasks.scan_task': 1}, 
  'pid': 33832, 'clock': '16633', 'uptime': 16961, 
  'pool': {
      'implementation': 'celery.concurrency.prefork:TaskPool', 
      'max-concurrency': 4, 
      'processes': [
                33960,
                33981,
                34030,
                34091
      ], 
      'max-tasks-per-child': 'N/A', 'put-guarded-by-semaphore': False, 'timeouts': [
                0,
                1800
      ], 
      'writes': {
          'total': 1, 
          'avg': '1.00', 
          'all': '1.00', 
          'raw': '1',
          'strategy': 'fair', 
          'inqueues': {
              'total': 4, 'active': 0
          }
      }
  }, 
  'broker': {
      'hostname': '127.0.0.1', 
      'userid': 'guest', 
      'virtual_host': '/', 
      'port': 5672, 
      'insist': False, 
      'ssl': False, 
      'transport': 'amqp', 
      'connect_timeout': 4, 
      'transport_options': {}, 
      'login_method': 'PLAIN', 
      'uri_prefix': None, 
      'heartbeat': 120.0, 
      'failover_strategy': 'round-robin',
      'alternates': []
  }, 
  'prefetch_count': 16, 
  'rusage': {
      'utime': 64.171124, 
      'stime': 16.814911, 
      'maxrss': 105964, 
      'ixrss': 0, 
      'idrss': 0, 
      'isrss': 0, 
      'minflt': 59910, 
      'majflt': 0, 
      'nswap': 0, 
      'inblock': 0, 
      'oublock': 16, 
      'msgsnd': 0, 
      'msgrcv': 0, 
      'nsignals': 0, 
      'nvcsw': 20866, 
      'nivcsw': 15112
  }
 }
}

Worker Ping Task

A new tasks.py file will be added to the worker application. Within it, a function will build a list of names from the Worker table and then issue a ping() operation to verify they are online.

Pseudocode

from chirps.celery import app
from django.utils import timezone

def ping_task():
   celery_inspection = app.control.inspect()

   for worker in Worker.objects.all():
      result = celery_inspection.ping(destination=[worker.celery_name])
      if result is None:
         worker.available = False
      else:
         worker.last_ping = timezone.now()

      worker.save()

Worker status view

So with the new periodic task in place, the view which fetches the status of the workers will now query the database instead of querying the worker itself.