minvws / nl-kat-coordination

Repo nl-kat-coordination for minvws
European Union Public License 1.2
121 stars 55 forks source link

Batched jobs for scheduler and task runner #2613

Open jpbruinsslot opened 4 months ago

jpbruinsslot commented 4 months ago

Prerequisites:

Possible solutions:

  1. Push a batched task onto queue with lowest possible priority. Update the task in place and append ooi to list when similar Boefje task comes along.
  2. Let runner pop batched tasks from queue by filtering (and possibly aggregating)
  3. Delay tasks by setting status to DELAYED and when threshold is reached (e.g. time or number of OOI's) push to queue.
Donnype commented 4 months ago

As has been discussed with @jpbruinsslot and @underdarknl, one of the main reasons of this feature is the ability to create a batched job, meaning a job targeting one boefje type for multiple input OOIs. Given that we are moving to a setup where running one job will have significant overhead due to containerization, this will likely introduce a welcome performance gain.

[!NOTE] Let me first note that I believe that this should actually be benchmarked first or based on production data about the amount of jobs run per day and the spread that occurs, because else assuming this will be a bottleneck is merely guesswork. But I'll ignore those principles for now.

There are currently two possible routes.

Route 1: Build batched jobs in the scheduler queue

As proposed by @underdarknl, one way to create a batched job is to patch them in the scheduler. This would mean:

Performance scenario's

The queue is very full and a "batchable" job gets added

In this case, the batched jobs get a lower priority so will depend on the last_run-logic to be scheduled at some point at least. Probably the new job will either be appended to an existing one or goes to the bottom of the queue. Meanwhile, this job gets lots of oois and will not be scheduled right away. Once it gets run, the graph has "stabilized" and the job will do a large batch at once. Note that if the queue fluctuates a lot, it is not guaranteed the job will not be picked up unless the queue stabilized.

The queue is empty and a "batchable" job gets added

In this case, batched jobs still get a lower priority, but it gets scheduled right away because it is the only job on the queue. There is likely next to no performance gain in this case.

Features required to implement:

I would not opt for having 2 kinds of jobs data in the scheduler, because this will imply that we will have to maintain two sets of queries whenever we want to find jobs for a certain input_ooi. Therefore let's assume all jobs become "batched" jobs. We have to:

  1. Add a boefje flag in the boefje_meta that communicates whether it supports batched jobs
  2. Change the priority logic to take this into account when adding a priority
  3. Think about how to handle the priority when important input_oois are added to a job with not so important input oois
  4. Change the data structure of json of the prioritized item:
    • The uuids in the scheduler tasks and prioritized items (boefje_meta/normalizer_meat) are currently the same. If we decide to create a list of jobs in the json field, we should loosen this implicit correlation.
    • If we do not do this, we should probably change the boefje_meta signature (at least for the scheduler and boefjes). I think this will be too tedious as this model is everywhere in the code.
  5. Assuming we decide to loosen the uuid-pk correlation, we must check for usages and update at least the Rocky code that reschedules tasks. We should in general update the Rocky code to handle these kinds of tasks and API changes.
  6. Update the boefje worker to accept the new data model and pass it to the right implementation in the handlers, defaulting to a naive for-loop
  7. Create a migration plan for these kinds of jobs.

Point 7 will not be a big deal if we do not decide to handle 1 data structure and allow both batched and non-batched jobs. Again this would require switch statements somewhere handled generically in the scheduler when querying the queue. But also in rocky and boefjes while fetching jobs from the scheduler API.


Route 2: Aggregate in the worker and change the queue API

Another way would be to leave the data structures intact and query the scheduler queue API in a smarter way:

Performance scenario's

The queue is very full and a "batchable" job gets added

In this case, either there are several jobs for the same boefje on the queue because the queue has piled up, or there are no jobs because one of the workers has just picked up the batch for this boefje. (This chance could of course be lowered by doing the queue-query in a smart way.) It gets added to the queue and will be picked up at some point together with all other jobs for the same boefje_id. This will likely be a rather large batch, but we might end up running several medium-sized batches instead of one large batch. But if we end up doing several of those, the queue was probably not that full, as we will probably see that the bigger the queue, the bigger the batch sizes. In the worst case you sometimes do a batch of 1.

The queue is empty and a "batchable" job gets added

In this case, again the job gets scheduled right away because it is the only job on the queue. There is likely next to no performance gain in this case.

Features required to implement:

  1. Update the scheduler API to add an endpoint or some sort of pop-many functionality that allows the preferred way to filter/group using the boefje_id
  2. Think about how to handle the priority logic for such jobs in the scheduler, or change the worker to loop over all enabled boefjes
  3. Update the handler to grab a batch of jobs using the new API instead of popping 1 job, and then run the batched implementation for the boefje if it exists, again defaulting to a naive for-loop.

Note that an added benefit to using a limit-api is that you are able to control how many tasks you do in parallel with just a limit to the api call, without needing chunking-logic in the worker to perhaps later handle instances where batched jobs get resource intensive per ooi so that you want to limit the batch size.

jpbruinsslot commented 3 months ago

Further steps investigate possibility of /pop endpoint returning jobs of the same type https://github.com/minvws/nl-kat-coordination/issues/2811

jpbruinsslot commented 2 months ago

To summarize:

We want the job runner to receive all similar jobs from the scheduler of type X if type X is the job the scheduler would return if asked for a single job, and if job X's boefje has a flag that tells us batched jobs are in order. This would allow the job runner to receive a ilst of these jobs, combine them (if the job runner can handle that, else just execute them one by one), and send the newly combined job to the boefje for processing. The boefje would then do its internal optimized magic to be more efficient when dealing with large amounts of input-objects.

This optimization would allow for example the RPKI boefje to ingest all IP-addresses at the same time, parsing and updating its RPKI-input data only once, sort the given IP's, and iterate over all the ipranges listed in the rpki-source instead of over the ip-addresses for each original job, while still creating a correct answer for each ipaddresses based on its iprange.

From: https://github.com/minvws/nl-kat-coordination/issues/2811#issuecomment-2066012253