Closed jeffhollan closed 5 years ago
@lee0c gave an update in our community meeting today (🎉 thanks @lee0c ) as he's been working with a customer on using queue-based scaling.
He found that you can use the SIGTERM and termination channel to delay termination of a deployment from HPA for a really long time. However, it does display as "Terminating...." during the entire time, which may be confusing to operators who could inadvertently delete thinking something was stuck that may be terminating for hours.
He's also built out a PoC around using Kubernetes Jobs and creating them for some number of items in the queue. The idea is that a job is created, it consumes a single or small batch of items, runs to completion, and terminates. It would be up to some controller to create jobs based on metrics, and then clean them up as needed. At least here though the job status accurately reflects the work being done.
@lee0c is going to come up with a proposal or some thoughts about how this could be integrated with KEDA directly. We'll use this issue to discuss. A few questions from me:
Thanks @lee0c - let me know your thoughts on this feature
For easy finding, there is some discussion around this issue in a long thread on scaling down specific pods:
@jeffhollan Wanted to finally get this in so excuse the lack of polish, I was out sick earlier this week & my schedule took a bit of a hit! Here's all the notes I have on KEDA design to add jobs. A lot of the major changes are things I have already implemented or mostly implemented, most of what would be left is smaller tweaks and more testing.
Notable changes:
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: functions-example-scaledobject
labels: # Deployment scale type only
deploymentName: functions-example-deployment
spec:
scaleType: job # Optional. Default: deployment
scaleTargetRef: # Deployment scale type only
deploymentName: functions-example-deployment
# General settings
pollingInterval: 30 # Optional. Default: 30 seconds
minReplicaCount: 0 # Optional. Default: 0
maxReplicaCount: 100 # Optional. Default: 100
triggers: []
# Deployment specific
cooldownPeriod: 300 # Optional. Default: 300 seconds
# Job specific
parallelism: 1 # Optional. Default: 1 (no parallelism)
completions: 1 # Optional. Default: 1
activeDeadline: # Optional. In seconds. Default is unset.
backoffLimit: 6 # Optional. Kubernetes default is 6
# Sets the podSpec of the consumer pod the jobs will manage.
# This will take any valid consumer spec that can be deployed to
# the cluster.
# There is no need to specify job-specific settings such as
# RestartPolicy
consumerSpec:
containers:
- name: long-running-consumer
image: lee0c/sbq-consumer
envFrom:
- secretRef:
name: servicebus-connection
The ScaledObject api file adds these fields and adds two constants:
// Scale type constants
const (
ScaleTypeDeployment string = "deployment"
ScaleTypeJob string = "job"
)
Several of the job fields are omitted if empty as they are based on K8s defaults:
type ScaledObjectSpec struct {
ScaleType strin `json:"scaleType"`
ScaleTargetRef ObjectReference `json:"scaleTargetRef"`
PollingInterval *int32 `json:"pollingInterval"`
CooldownPeriod *int32 `json:"cooldownPeriod"`
MinReplicaCount *int32 `json:"minReplicaCount"`
MaxReplicaCount *int32 `json:"maxReplicaCount"`
Parallelism *int32 `json:"parallelism,omitempty"`
Completions *int32 `json:"completions,omitempty"`
ActiveDeadline *int32 `json:"activeDeadline,omitempty"`
BackOffLimit *int32 `json:"backOffLimit,omitempty"`
Triggers []ScaleTriggers `json:"triggers"`
ConsumerSpec *core_v1.PodSpec `json:"consumerSpec,omitempty"`
}
Nothing in the controller needs to change.
Nothing in the adapter needs to change.
For jobs, no HPA need be created. With the ScaleType field, this is easy to control via switch or if statements.
// WatchScaledObjectWithContext runs a handleScaleLoop go-routine for the scaledObject
func (h *ScaleHandler) WatchScaledObjectWithContext(ctx context.Context, scaledObject *keda_v1alpha1.ScaledObject, isDue bool) {
switch scaledObject.Spec.ScaleType {
case keda_v1alpha1.ScaleTypeJob:
h.createJobInformerForScaledObect(scaledObject)
default:
h.createOrUpdateHPAForScaledObject(scaledObject)
}
go h.handleScaleLoop(ctx, scaledObject, isDue)
}
// HandleScaledObjectDelete handles any cleanup when a scaled object is deleted
func (h *ScaleHandler) HandleScaledObjectDelete(scaledObject *keda_v1alpha1.ScaledObject) {
switch scaledObject.Spec.ScaleType {
case keda_v1alpha1.ScaleTypeJob:
h.deleteJobsForScaledObject(scaledObject)
default:
h.deleteHPAForScaledObject(scaledObject)
}
}
HPA creation is replaced with creation of a Job Informer to clean up jobs after completion. HPA deletion instead is job deletion and job informer deletion
func (h *ScaleHandler) deleteJobsForScaledObject(scaledObject *keda_v1alpha1.ScaledObject) {
// end the job informer
if (h.jobInformerStopCh != nil) {
close(h.jobInformerStopCh)
}
// delete all running jobs for this scaled object
propagationPolicy := meta_v1.DeletePropagationBackground
err := h.kubeClient.BatchV1().Jobs(scaledObject.GetNamespace()).DeleteCollection(
&meta_v1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
},
meta_v1.ListOptions{
// TODO: job labels
LabelSelector: "",
},
)
if err != nil {
log.Errorf("Failed to delete jobs of ScaledObject %s", scaledObject.GetName())
}
}
The job informer only reacts to updates, & deletes a job after it completes:
func (h *ScaleHandler) createOrUpdateJobInformerForScaledObject(scaledObject *keda_v1alpha1.ScaledObject) {
sharedInformerFactory := informers.NewSharedInformerFactory(h.kubeClient, time.Second * time.Duration(defaultJobInformerResync))
jobInformer := sharedInformerFactory.Batch().V1().Jobs().Informer()
jobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
new := newObj.(*batch_v1.Job)
if new.Status.CompletionTime != nil {
// TODO: job labels
propagationPolicy := meta_v1.DeletePropagationBackground
err := h.kubeClient.BatchV1().Jobs(scaledObject.GetNamespace()).Delete(new.Name, &meta_v1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
if err != nil {
log.Errorf("Failed to delete job %s", new.Name)
}
log.Infof("Cleaned up job %s", new.Name)
}
},
})
h.jobInformerStopCh = make(chan struct{})
defer close(h.jobInformerStopCh)
go jobInformer.Run(h.jobInformerStopCh)
}
This will need:
Additionally, job creation will be handled in this loop. This will need to be fine tuned to work well with parallelism & completions (could just implement 1:1 first, then upgrade) but will be otherwise similar to my proof of concept - which I will link, I'm cleaning up a few things but will comment it here later tonight.
Scalers do not need to change.
Oh, link to current jobs manager (consumer folder is empty rn, consumer image is in docker hub though): https://github.com/lee0c/k8s-jobs
Thanks @lee0c for putting this together. I think this looks good to me. Only 2 questions I have
parallelism
is how many concurrent jobs at a maximum may run. The other ones I'm not as sure # Job specific
parallelism: 1 # Optional. Default: 1 (no parallelism)
completions: 1 # Optional. Default: 1
activeDeadline: # Optional. In seconds. Default is unset.
backoffLimit: 6 # Optional. Kubernetes default is 6
Let me know - thanks for putting this together!
/cc - @yaron2 @Aarthisk @anirudhgarg @ahmelsayed
Should ScaleType be mandatory (breaking change)?
IMO it should not be mandatory. If for any reason, in general, we do decide on breaking changes, then we should introduce new objects with a different apiVersion
.
Should users specify a job name base used to generate job names?
How about we use the .metadata.name
for ScaledObject
as the base name?
Should parallelism & completions be included (they make scaling decisions tough), or should users control scaling by controlling how their consumer ingests events?
In terms of a Kubernetes Job
, we should aim for work queue based parallelism, meaning we should not allow for fixed completions. parallelism should be decided by the scaler metric.
If only a job scale type trigger is specified, should the adapter not be run?
Not sure I understand - the Adapter runs inside the controller pod, but in any case, if the Adapter is not running we won't be able to handle any future non job based workloads.
Should the JobInformer resync time be customizable?
Best to keep it customizable as some workloads may need this type of granularity.
Job TTL after completion isn't customizable in this model - jobs are deleted immediately on completion. Is this worth making customizable?
Yes, same reason as above.
@jeffhollan
parallelism: 1 # Optional. Default: 1 (no parallelism)
Number of pods a job will run at once
completions: 1 # Optional. Default: 1
Number of pods that must complete in order for a job to finish
activeDeadline: # Optional. In seconds. Default is unset.
How long a pod can be active w/out succeeding(completing) before it is considered failed & terminated
backoffLimit: 6 # Optional. Kubernetes default is 6
How many times a pod will retry starting in case of a crash or failure
@jeffhollan Work left really depends on the flexibility of this - the more options & customizability you want, plus the more we want the scaling setup to be able to change to suit many workloads, the more time it will need. A very basic amount of customizability would likely be a day or two of work, more would be a bit long (& I would likely want some extra feedback from @yaron2 or someone else on the team on a few things) . Important to note, though, that the customer requirements have moved & will likely not require this support - so at this point the decision is more on whether this is a good fit for KEDA w/ no customer pressure.
Ah very interesting and thanks for the insight. Curious on how the customer is looking to solve this problem? It's still interesting functionality but I'm almost apt to say we check it into a release branch and wait for more people to ask for this pattern before fully merging it in? Ideally we have at least one interested user before adopting the code. I suspect they will come, but may influence some of the design choices as well and was hoping this customer could help drive some of that for us here.
The customer is going to be checkpointing work within their app due to several reasons, which removes most of the need to have run-to-completion jobs with no interruption.
That sounds like a good plan, I'll put some more time into it when I'm back in Redmond so it can get to a proper code review & then it can stay in it's own branch for a while. I'm also considering cleaning up my standalone version so that it's easier to use/configure.
As a job author, how do I figure out what number of completions
makes sense for my job? My understanding is that jobs run to completion, right? So if we scale to 3, don't all 3 have to finish for the job to be completed? Maybe I'm misunderstanding how this works.
@ahmelsayed Apologies for slow response.
A job can manage multiple pods. The completions metric specifies how many pods, of the pods it is managing, must run to completion for the job to be considered done & end. [This page] goes into more depth about different patterns around k8s jobs, & when they might be good vs. bad
I think you forgot the link to the page :)
You're totally right - here you go: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#job-patterns
@seushermsft happy to assign this to you if you accept the invite to the org
@seushermsft happy to assign this to you if you accept the invite to the org
@jeffhollan - Thought I had, but it seems I ignored your invite instead. :) I've joined the org now, so please assign this.
@seushermsft Any updates on this?
@lee0c - PR #322 has been out to handle this change - It has one sign-off already. @stgricci is the person on my team making the actual change.
This is now merged so this issue can be closed I presume, right @jeffhollan @ahmelsayed ?
I still don't understand how to implement and what will the solution for long running workloads , I tried PreStop hook but there are no logs and no events for that
@chhayasingh0112 this has been resolved a while ago: https://keda.sh/docs/2.12/concepts/scaling-jobs/
Today KEDA works to scale Deployments that are assumed to continue to consume messages while active. However, this means it's expected the event consumers are written to be very idempotent with no guarantees on how long a deployment will live for (not just because of KEDA, but also because of the HPA as discussed here).
Potentially KEDA could see events are ready to be processed and create one or many Kubernetes Jobs that can then run and process the messages. These containers would be written to consume an event and run to completion.