q-m / scrapyd-k8s

Scrapyd on container infrastructure
MIT License
12 stars 8 forks source link

6 scheduling #36

Open vlerkin opened 2 weeks ago

vlerkin commented 2 weeks ago

What happens in the PR:

  1. The logic of event watcher was separated in an observer class; the logic of log watching stayed in a log handler class, but the initialization was changed to subscribe to the event in case jobless feature was configured;
  2. The new class KubernetesScheduler was created to handle logic when jobs must be unsuspended and how (ordered);
  3. scheduler endpoint was modified, logic to set a value for start_suspended parameter was added;
  4. schedule method from k8s launcher has a new start_suspended parameter, it's value is passed when called inside the api; also new methods were added: unsuspend_job patches existing suspended job suspend=False, get_running_jobs_count returns the number of jobs that are currently running, list_suspended_jobs returns the list of jobs where spec.suspend is true, _get_job_name extracts the job name from the metadata, it is then used for unsuspend function;

The big picture: Event watcher connects to the k8s api and receives the stream of events, it then notifies the subscribers if a new event is received and passes it to the provided callback. The subscriber - KubernetesScheduler - receives event in a handle_pod_event method, this method reacts to the changes in job statuses, and if job completed running or failed it calls another method - check_and_unsuspend_jobs - that checks capacity and unsuspends jobs until the number of allowed parallel jobs is reached, while doing this it relies on another method - get_next_suspended_job_id - to unsuspend the most recent job, to keep the order in which jobs were initially scheduled. When the job is scheduled, based on the number of currently active jobs and max_proc provided in the config (default is 4), the job runs or goes to the queue of suspended jobs (native k8s queue). Then events that change the number of active jobs trigger the logic of KubernetesScheduler class that unsuspend suspended jobs until the desired state (num of parallel jobs) is achieved.

wvengen commented 2 weeks ago

Hope my feedback was at an angle that helps you at this stage. In any case, well done, keep it going!

p.s. the CI error looks like it could be cause by Kubernetes-specific things having entered into the main api code, which wouldn't work when running with Docker.

vlerkin commented 2 weeks ago

Working on Docker implementation to be added to this PR

vlerkin commented 6 days ago

I have problems because I separated this PR partially and now I have a multiverse which I need to refactor to the only source of truth. Going to spend some uncertain amount of time on that.

wvengen commented 6 days ago

The way I would do this:

  1. Continue working on this PR, until you need the functionality developed in the other PR (or until it has been merged).
  2. Interactive rebase on the branch of the other PR. Filter out the commits you had here that you rewrote in the other branch.
  3. There may be little or much work to do in resolving conflicts. If it is really many, in various commits, you may consider another route (see below).
  4. Test, done.

Of this is much work in many commits, you may consider first doing an interactive rebase of this PR, to simplify it, and reduce the number of commits (that each may need amending).

Yes, this is a bit of work, but something I come across now and then, in various projects. Sorry for the complexity!

vlerkin commented 6 days ago

Thank you for the advice! I was thinking of dropping the commit with the merging main to this branch, then make the code work so the tests run if needed. Then merging with that other branch that refactored the observer further and make the code of both branches work together and then check if there are any conflicts with main and resolving those. This is a bit longer way than simply redoing the merge with the main branch but I messed up the last one because I lost track of changes, so gradually rebuilding this branch is a bit easier for me.

No worries, this is me who messed up merging, complexity is part of the job:D Learning to make more granular commits and cleaner PRs the hard way:D

vlerkin commented 1 hour ago

I modified one of the methods in the scheduler (get_next_suspended_job_id) to handle cases if a job does not have a creation_timestamp. It is not expected but if someone used a custom resource and forgot to add this field or made any other error, the job will get the timestamp assigned and will be processed like other jobs in the queue.

Also, there are now unit tests that cover different scenarios for the scheduler.

If you have any other comments for improvements, let me know!