freelawproject / courtlistener

A fully-searchable and accessible archive of court data including growing repositories of opinions, oral arguments, judges, judicial financial records, and federal filings.
https://www.courtlistener.com
Other
536 stars 148 forks source link

Celery throttle counts failed tasks when it should only count ones awaiting workers #3313

Open mlissner opened 11 months ago

mlissner commented 11 months ago

Our Elastic indexer got stuck today and wouldn't send any more cases to Celery. I did some work trying to figure out why. I think I was able to prove to myself that the issue is failed tasks that are populating the queue.

You can check the length of Celery's queues by using llen on the etl_taks key in Redis, and you can do a slightly better job using my answer here:

https://stackoverflow.com/a/43420719/64911

But when you inspect the queue a bit, it starts to feel like the workers are a bit bored. They're not doing much, and I think the reason is that Celery stores failed tasks in the same key as live ones, until they expire. So, although there are 20k tasks in the queue right now, almost none of them are ready to go.

The CeleryThrottle class doesn't make this distinction, so if a bunch of tasks fail, the throttle checks the length of the queue, finds that it is very long, and then doesn't add any more items to the queue. But it's only long because of failed tasks that haven't expired yet.

I'm not sure what the solution to this is. I think the only way to identify a failed task is to grab it, deserialize it, and inspect it. If there are a lot of tasks on the queue, well, this will be very slow.

On the other hand, intuitively, there must be a way to do this. The celery workers themselves are able to somehow figure out which tasks are failed and which are not — otherwise, the workers wouldn't know which tasks to execute next. But how?

A few alternatives, I guess:

  1. Should we make sure that none of our tasks save their failed state in the queue? I think you can do this with the celery decorator (ignore_result, maybe?), but I forget if it affects things like retries or something.

  2. Is there a way to save failed tasks to a different queue, perhaps?

  3. Maybe we can find a fast way to inspect the number of tasks waiting to get run.

  4. Maybe we file a bug with Celery and hope they're interested in fixing it (unlikely).

  5. Switch to a different task queue? We use rq in bots.law.

mlissner commented 11 months ago

Discussed this a bit here. Hopefully we'll get replies: https://github.com/celery/celery/discussions/8595

mlissner commented 11 months ago

@albertisfu, I think we should set ignore_result for all celery tasks that aren't used in canvases or chords.

I think what's happening is that when they fail they fill up the queue and that blocks the throttle from indexing more content until the failures expire.

albertisfu commented 11 months ago

@mlissner in order to figure it out what's the origin of all the tasks that are currently in the elt_tasks queue we could use the following code:

import json
import os
from django.conf import settings

from cl.lib.redis_utils import make_redis_interface
r = make_redis_interface("CELERY")
queue_name = "etl_tasks"

start_index = 0
end_index = 10000
tasks = r.lrange("etl_tasks", start_index, end_index)

es_path = os.path.join(settings.MEDIA_ROOT, "es_tasks")
if not os.path.exists(es_path):
                os.makedirs(es_path)

file_path = os.path.join(settings.MEDIA_ROOT, "es_tasks", 'etl_tasks.txt')
with open(file_path, 'w') as file:
        for index, task_data in enumerate(tasks, start=start_index):
            task_json = json.loads(task_data)
            file.write(f"Task Index: {index}\n")
            file.write(f"Task Name: {task_json['headers']['task']}\n")
            file.write(f"Retries: {task_json['headers']['retries']}\n")
            file.write(f"Arguments: {task_json['headers']['argsrepr']}\n")
            file.write("-------------------------\n")

I tested it locally and it works properly, it'll return a sample of the tasks in queue from start_index to end_index, newer task are at the top so for instance: start_index = 0 end_index = 10000

Will return the 10,000 newer tasks in the queue.

The output includes data that can help us to trace the origin of these tasks, including the task name, number or retries and the task arguments. It will be stored in a txt file in a media folder called: es_tasks

So with this file I can analyze it and try to figure it out what's wrong.

mlissner commented 11 months ago

Here's the output, thank you:

etl_tasks.txt

albertisfu commented 11 months ago

Thanks for the file, I've analysed it.

I created an script to look for the duplicated tasks in this file: find_duplicates_3.txt

Here are the results: duplicate-tasks.txt

Out of the 10,000 tasks, 5,035 are duplicates. This means we are, on average, triggering twice the number of required tasks. Depending on the type of content and source, some tasks can be repeated more than two times, as described below.

After examining some duplicated tasks, I've identified the following issues so far:

I'll work on fixing these issues. Afterward, we should see a decrease of around 50% on average in the number of ES indexing tasks.

mlissner commented 11 months ago

Wow! Great investigation, and I bet getting these out of the queue will help with our bulk indexing too. How do we know if we've successfully stopped this problem and will we be able to know if it has regressed?

albertisfu commented 11 months ago

Yeah, I believe that once we remove all these repeated tasks from the queue, the ES load will decrease, and the indexing rate should improve.

To verify that the issue has been resolved after implementing the above solutions, we can sample new tasks from the queue and use the script to search for duplicates. The number of duplicates should be significantly reduced (almost zero). If there are still a few duplicates, they could be related to actual data changes occurring within a short period of time. We'll be able to confirm that once we examine the results.